diff --git a/Cargo.toml b/Cargo.toml index 102879a..c4bba56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,10 @@ axum = { version = "0.8.4", features = ["ws"] } dotenv = "0.15.0" futures = "0.3.31" lazy_static = "1.5.0" -reqwest = { version = "0.12.20", features = ["rustls-tls"] } +reqwest = { version = "0.12.20", features = ["json", "rustls-tls"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" tokio = { version = "1.45.1", features = ["full"] } tracing = "0.1.41" tracing-subscriber = "0.3.19" +utoipa-axum = "0.2.0" diff --git a/README.md b/README.md index bd53682..4d8dc9c 100644 --- a/README.md +++ b/README.md @@ -22,11 +22,11 @@ ML-API is a high-performance Rust service designed to process system metrics, an Create a `.env` file in your project root: ```env -# ML API target URL (e.g., Ollama would use: http://localhost:11434/api/generate) -ML_TARGET_URL="http://your-ml-backend/api" - -# Log level (TRACE|DEBUG|INFO|WARN|ERROR|OFF) -ML_LOG_LEVEL="INFO" +ML_TARGET_URL="http://url.to/ml/api" +ML_MODEL_NAME="kis-test" +ML_REQUEST_TIMEOUT="10" +ML_API_LOG_LEVEL="INFO" +ML_API_PORT="5134" ``` ### Installation @@ -64,10 +64,8 @@ Send your metrics as a JSON array in the following format: { "id": "10001", "name": "cpu_utilization", - "type": "i64", - "addr": "enode.monitoring.api", - "value": null, "description": "cpu_utilization", + "value": 1.23, "status": 0, "device": 18, "source": "module$11" diff --git a/src/endpoints.rs b/src/endpoints.rs index 144e600..441c0fd 100644 --- a/src/endpoints.rs +++ b/src/endpoints.rs @@ -1,5 +1,13 @@ -use axum::response::IntoResponse; -use axum::http::StatusCode; +use axum::{ + response::IntoResponse, + http::StatusCode, + extract::{ Json, State }, +}; +use crate::{models::ModelResponse, schemas::InputMetric}; +use crate::models::{ApiSessionConfig, OutputModel}; +use std::sync::Arc; + +type Config = Arc; pub mod openapi { use super::{IntoResponse, StatusCode}; @@ -8,13 +16,97 @@ pub mod openapi { } pub mod rest { - use super::{IntoResponse, StatusCode}; + use tracing::trace; + use super::{IntoResponse, StatusCode, InputMetric, Json, Config, State}; - pub async fn model_rest_handler() -> impl IntoResponse { (StatusCode::NOT_IMPLEMENTED, "still in development ...") } + pub async fn model_rest_handler( + State(config) : State, + Json(req) : Json>, + ) -> impl IntoResponse { + trace!("GET on /api/metrics/rest"); + return match super::send_message_to_model(config.clone(), req).await { + Ok(resp) => (StatusCode::OK, resp), + Err(er) => (StatusCode::INTERNAL_SERVER_ERROR, format!("cannot get model response: {er}")), + } + } } pub mod ws { - use super::{IntoResponse, StatusCode}; + use axum::extract::{ws::{Message, WebSocket}, WebSocketUpgrade}; + use tracing::trace; + use super::{IntoResponse, InputMetric, Config, State}; - pub async fn model_ws_handler() -> impl IntoResponse { (StatusCode::NOT_IMPLEMENTED, "still in development ...") } + pub async fn model_ws_handler( + State(config): State, + ws: WebSocketUpgrade, + ) -> impl IntoResponse { + trace!("working with new WebSocket connection ..."); + ws.on_upgrade(|socket| model_ws_worker(config, socket)) + } + + async fn model_ws_worker( + config: Config, + mut ws: WebSocket, + ) { + trace!("handling WebSocket connection ..."); + let ws_reciever = tokio::spawn(async move { + while let Some(Ok(msg)) = ws.recv().await { + match msg.to_text() { + Err(er) => { + let _ = ws.send(Message::Text(format!("Cannot convert input message: {er}").into())).await; + }, + Ok(msg) => { + match serde_json::from_str::>(msg) { + Err(er) => { + let _ = ws.send( + Message::Text( + format!("Cannot convert input message: {er}") + .into() + ) + ).await; + }, + Ok(req) => { + match super::send_message_to_model(config.clone(), req).await { + Ok(resp) => { + let _ = ws.send(Message::Text(resp.into())); + }, + Err(er) => { + let _ = ws.send(Message::Text(format!("Cannot get model's response: {er}").into())).await; + }, + } + }, + } + }, + } + } + }); + + let _ = ws_reciever.await; + } +} + +async fn send_message_to_model( + config: Config, + req: Vec +) -> anyhow::Result { + let prompt = OutputModel::build( + &config.model_name, + format!("{} {}", serde_json::to_string(&req)?, &*crate::PROMPT) + ); + let request = config + .client + // TODO: REMOVE CLONE() + .clone() + .post(&config.target_url) + .json(&prompt) + .timeout(tokio::time::Duration::from_secs(config.request_timeout as u64)); + + Ok( + request + .send() + .await? + .json::() + .await? + .response + ) } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index de198b4..54aa64e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ mod models; mod setup; use std::sync::Arc; -use tracing::info; +use tracing::{debug, info, trace}; use dotenv::dotenv; use endpoints::{ openapi::swagger, @@ -12,10 +12,15 @@ use endpoints::{ ws::model_ws_handler, }; use setup::setup_self_config; -use schemas::ApiSessionConfig; +use models::ApiSessionConfig; use axum::{ routing::get, Router}; +use lazy_static::lazy_static; + +lazy_static! { + static ref PROMPT: String = String::from("\n проанализируй ВЕСЬ этот список метрик системы, обращая внимание на статусы КАЖДОЙ МЕТРИКИ:\n 0 - не определен \n 1 - норма \n 2 - предупреждение \n 3 - критично \n 4 - авария \n напиши общую подробную оценку системы с указанием вообще всех слабых мест и рекомендаций по устранению проблем (ВАЖНО!!! : только на русском языке без форматирования и лишнего текста, без комментариев, только оценка и рекомендации) \n ВАЖНО! НЕ ИСПОЛЬЗУЙ MD-форматирование выходного текста, анализируй метрики выше и по ним делай выводы и рекомендации"); +} #[tokio::main(flavor = "multi_thread")] async fn main() -> anyhow::Result<()> { @@ -25,14 +30,17 @@ async fn main() -> anyhow::Result<()> { let base_url = format!("0.0.0.0:{}", port); // routing + trace!("configurating routing ..."); let api = Router::new() .route("/rest", get(model_rest_handler)) - .route("/ws", get(model_ws_handler)); + .route("/ws", get(model_ws_handler)) + .with_state(app_state.clone()); let router = Router::new() - .nest("/api", api) - .route("/swagger", get(swagger)) - .with_state(app_state.clone()); + .nest("/api/metrics", api) + .route("/swagger", get(swagger)); + + debug!("router for app : {:?}", router); // running info!("serving on {} ...", &base_url); diff --git a/src/models.rs b/src/models.rs index e69de29..16a6dbd 100644 --- a/src/models.rs +++ b/src/models.rs @@ -0,0 +1,81 @@ +use reqwest::Client; +use tracing::{debug, instrument, warn, info}; +use std::fmt; +use serde::{Deserialize, Serialize}; + +/// +#[derive(Debug)] +pub struct ApiSessionConfig { + // integration config + pub target_url : String, + pub model_name : String, + pub request_timeout : usize, + pub client : Client, +} + +impl fmt::Display for ApiSessionConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + let param_name_width = 25; + let param_name_val = (self.model_name.as_bytes().iter().count() + 4).max(self.target_url.as_bytes().iter().count() + 4); + let param_name_measure = 10; + + // header + writeln!(f, "+{:- anyhow::Result { + let config = Self { + target_url : std::env::var("ML_TARGET_URL")?, + model_name : std::env::var("ML_MODEL_NAME")?, + request_timeout : std::env::var("ML_REQUEST_TIMEOUT")?.parse().unwrap_or_else(|_| { + warn!("invalid port was given, setting up default one ..."); + 10 + }), + client : Client::new(), + }; + debug!("{:?}", config); + info!("app is configurated. config :\n{}", config); + Ok(config) + } +} + +#[derive(Debug, Serialize)] +pub struct OutputModel { + model : String, + prompt : String, + stream : bool, +} + +impl OutputModel { + pub fn build(model_name: &str, prompt: String) -> Self { + Self { + model : model_name.to_owned(), + prompt : prompt, + stream : false, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ModelResponse { + pub response: String, +} \ No newline at end of file diff --git a/src/schemas.rs b/src/schemas.rs index d59a695..49544ae 100644 --- a/src/schemas.rs +++ b/src/schemas.rs @@ -1,55 +1,13 @@ -use tracing::{debug, instrument, warn, info}; -use std::fmt; +use serde::{Serialize, Deserialize}; -/// -#[derive(Debug)] -pub struct ApiSessionConfig { - // integration config - target_url : String, - model_name : String, - request_timeout : usize, -} - -impl fmt::Display for ApiSessionConfig { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - let param_name_width = 25; - let param_name_val = (self.model_name.as_bytes().iter().count() + 4).max(self.target_url.as_bytes().iter().count() + 4); - let param_name_measure = 10; - - // header - writeln!(f, "+{:- anyhow::Result { - let config = Self { - target_url : std::env::var("ML_TARGET_URL")?, - model_name : std::env::var("ML_MODEL_NAME")?, - request_timeout : std::env::var("ML_REQUEST_TIMEOUT")?.parse().unwrap_or_else(|_| { - warn!("invalid port was given, setting up default one ..."); - 10 - }), - }; - debug!("{:?}", config); - info!("app is configurated. config :\n{}", config); - Ok(config) - } +#[derive(Debug, Deserialize, Serialize)] +pub struct InputMetric { + id : String, + name : String, + description : String, + value : serde_json::Value, + status : usize, + device : usize, + source : String, + timestamp : u64, } \ No newline at end of file