diff --git a/logs.txt b/logs.txt new file mode 100644 index 0000000..c0969e7 Binary files /dev/null and b/logs.txt differ diff --git a/src/prometheus/Subscription.interface.ts b/src/prometheus/Subscription.interface.ts deleted file mode 100644 index e69de29..0000000 diff --git a/src/prometheus/index.ts b/src/prometheus/index.ts new file mode 100644 index 0000000..4213d99 --- /dev/null +++ b/src/prometheus/index.ts @@ -0,0 +1,5 @@ +// export * from './prometheus-metric.interface'; +// export * from './prometheus.service'; +// export * from './prometheus-cache.service'; +// export * from './prometheus-query.service'; +// export * from './prometheus-metadata.service'; \ No newline at end of file diff --git a/src/prometheus/metrics.gateway.ts b/src/prometheus/metrics.gateway.ts index 366b7c7..af8e1c7 100644 --- a/src/prometheus/metrics.gateway.ts +++ b/src/prometheus/metrics.gateway.ts @@ -1,23 +1,39 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { Server, WebSocket } from 'ws'; import { createServer } from 'http'; import { PrometheusService } from './prometheus.service'; import { ConfigService } from '@nestjs/config'; +import { PrometheusMetric } from './prometheus-metric.interface'; type Filters = Record; +interface RealtimeSubscription { + clients: Set; + interval: NodeJS.Timeout; + metric: string; + filters: Filters; + lastData: PrometheusMetric[]; +} + +interface HistoricalRequest { + client: WebSocket; + requestId: string; +} + @Injectable() -export class MetricsGateway implements OnModuleInit { +export class MetricsGateway implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(MetricsGateway.name); private wss: Server; + private httpServer: ReturnType; - private activeSockets = new Map(); - private metricSubscriptions = new Map< - string, - { stopUpdates: () => void; clients: Set } - >(); + // Real-time подписки (одна на метрику, много клиентов) + private realtimeSubscriptions = new Map(); - private lastSentData = new Map(); + // Активные клиенты + private activeClients = new Map(); + + // Исторические запросы (для отслеживания) + private historicalRequests = new Map(); constructor( private readonly prometheusService: PrometheusService, @@ -25,367 +41,429 @@ export class MetricsGateway implements OnModuleInit { ) { } onModuleInit() { - const httpServer = createServer(); + this.httpServer = createServer(); this.wss = new Server({ - server: httpServer, + server: this.httpServer, path: '/metrics-ws', }); this.wss.on('connection', (client, request) => this.handleConnection(client, request) ); + this.wss.on('error', (err) => this.logger.error('WebSocket server error:', err) ); const wsPort = Number(this.configService.get('WS_PORT') || 3001); - httpServer.listen(wsPort, () => { + this.httpServer.listen(wsPort, () => { this.logger.log( - `WebSocket server running at ws://localhost:${wsPort}/api/metrics-ws` + `WebSocket server running at ws://localhost:${wsPort}/metrics-ws` ); }); } + onModuleDestroy() { + // Очистка всех ресурсов + this.clearAllSubscriptions(); + this.wss?.close(); + this.httpServer?.close(); + } private handleConnection(client: WebSocket, request: any) { - let clientId = - this.getQueryParams(request?.url).clientId || - Math.random().toString(36).slice(2); + const clientId = this.getClientId(request?.url); + this.activeClients.set(clientId, client); - this.activeSockets.set(clientId, client); this.logger.log(`Client connected: ${clientId}`); + this.logger.debug(`Active clients: ${this.activeClients.size}, Subscriptions: ${this.realtimeSubscriptions.size}`); client.on('message', (raw) => { try { - const msg = JSON.parse(raw.toString()); - this.handleMessage(clientId, client, msg); + const message = JSON.parse(raw.toString()); + this.handleMessage(clientId, client, message); } catch (err) { - this.sendError(client, 'Invalid JSON'); + this.sendError(client, 'Invalid JSON format'); } }); - client.on('close', () => this.cleanupClient(clientId)); + client.on('close', () => this.handleClientDisconnect(clientId)); client.on('error', (err) => { this.logger.error(`Client ${clientId} error:`, err); - this.cleanupClient(clientId); + this.handleClientDisconnect(clientId); + }); + + // Отправляем приветственное сообщение + this.sendMessage(client, { + event: 'connected', + data: { clientId, timestamp: Date.now() } }); } private handleMessage(clientId: string, client: WebSocket, message: any) { - const { event, data } = message || {}; - if (!event) return this.sendError(client, 'Event type is required'); + const { event, data, requestId } = message; + + if (!event) { + return this.sendError(client, 'Event type is required', requestId); + } + + this.logger.debug(`Received event: ${event} from client: ${clientId}`); switch (event) { + case 'subscribe-realtime': + return this.handleSubscribeRealtime(clientId, client, data, requestId); + + case 'unsubscribe-realtime': + return this.handleUnsubscribeRealtime(clientId, data, requestId); + case 'unsubscribe-all': - return this.unsubscribeAllForClient(clientId); + return this.handleUnsubscribeAll(clientId, requestId); - case 'get-metrics': - return this.handleGetMetrics(client, data); + case 'get-historical': + return this.handleGetHistorical(client, data, requestId); - case 'subscribe-metric': - return this.handleSubscribeMetric(clientId, client, data); - - case 'unsubscribe-metric': - return this.handleUnsubscribeMetric(clientId, data); + case 'get-current': + return this.handleGetCurrent(client, data, requestId); default: - return this.sendError(client, `Unknown event type: ${event}`); + return this.sendError(client, `Unknown event type: ${event}`, requestId); } } - private cleanupClient(clientId: string) { - if (!this.activeSockets.has(clientId)) return; - this.logger.log(`Client disconnected: ${clientId}`); - this.activeSockets.delete(clientId); - - setTimeout(() => { - if (!this.activeSockets.has(clientId)) { - } - }, 5000); - - for (const [key, sub] of this.metricSubscriptions) { - sub.clients.delete(clientId); - if (sub.clients.size === 0) { - sub.stopUpdates(); - this.metricSubscriptions.delete(key); - this.lastSentData.delete(key); - } - } - } - - private unsubscribeAllForClient(clientId: string) { - for (const [key, sub] of this.metricSubscriptions) { - if (sub.clients.has(clientId)) sub.clients.delete(clientId); - if (sub.clients.size === 0) { - sub.stopUpdates(); - this.metricSubscriptions.delete(key); - this.lastSentData.delete(key); - } - } - } - - - private async handleGetMetrics(client: WebSocket, payload: any) { - const { - metric, - start, - end, - step, - isRangeQuery, - requestId, - filters = {}, - } = payload || {}; + private async handleSubscribeRealtime( + clientId: string, + client: WebSocket, + payload: any, + requestId?: string + ) { + const { metric, filters = {}, interval = 10000 } = payload || {}; if (!metric) { return this.sendError(client, 'Metric name is required', requestId); } - if (isRangeQuery) { - try { - const rangeData = await this.prometheusService.fetchMetricsRange( - metric, - start, - end, - step, - filters - ); - this.logger.debug('RangeQuery result', JSON.stringify(rangeData).slice(0, 200)); - - return this.sendMessage(client, { - event: 'metrics-data', - data: rangeData, - metric, - requestId, - }); - } catch (err: any) { - return this.sendError(client, err?.message || 'Range query error', requestId); - } - } + const subscriptionKey = this.getSubscriptionKey(metric, filters); try { - const subscriptionKey = this.getSubscriptionKey(metric, filters); + // Если подписка уже существует, просто добавляем клиента + if (this.realtimeSubscriptions.has(subscriptionKey)) { + const subscription = this.realtimeSubscriptions.get(subscriptionKey)!; + subscription.clients.add(clientId); - const initialData = - await this.prometheusService.fetchMetricsWithFilters(metric, filters); + this.logger.log(`Client ${clientId} added to existing subscription: ${subscriptionKey}`); + // Отправляем последние данные клиенту + if (subscription.lastData) { + this.sendMessage(client, { + event: 'realtime-data', + data: { + metric, + filters, + data: subscription.lastData, + type: 'initial' + }, + requestId + }); + } + return; + } + + // Создаем новую подписку + this.logger.log(`Creating new realtime subscription: ${subscriptionKey}`); + + // Первоначальная загрузка данных + const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + + if (!Array.isArray(initialData)) { + throw new Error(`Expected array for metric ${metric}, got ${typeof initialData}`); + } + + // Создаем интервал для обновлений + const intervalId = setInterval( + () => this.updateRealtimeData(subscriptionKey), + interval + ); + + // Сохраняем подписку + const subscription: RealtimeSubscription = { + clients: new Set([clientId]), + interval: intervalId, + metric, + filters, + lastData: initialData + }; + + this.realtimeSubscriptions.set(subscriptionKey, subscription); + + // Отправляем данные клиенту this.sendMessage(client, { - event: 'metrics-data', - data: { metric, data: initialData, requestId }, + event: 'realtime-data', + data: { + metric, + filters, + data: initialData, + type: 'initial' + }, + requestId }); - let lastLocal = initialData; + this.logger.debug(`Subscription created for ${subscriptionKey} with ${interval}ms interval`); - const jitter = Math.floor(Math.random() * 5000); - setTimeout(() => { - const timer = setInterval(async () => { - try { - const fresh = - await this.prometheusService.fetchMetricsWithFilters( - metric, - filters - ); - if (!this.isDataEqual(lastLocal, fresh)) { - this.sendMessage(client, { - event: 'metrics-data', - data: { metric, data: fresh, requestId }, - }); - lastLocal = fresh; - } - } catch (e) { - this.logger.error( - `Error in on-demand periodic update for ${subscriptionKey}:`, - (e as any)?.message - ); - } - }, step || 5000); - - const closeOnce = () => clearInterval(timer); - client.once('close', closeOnce); - client.once('error', closeOnce); - }, jitter); - } catch (err: any) { - return this.sendError(client, err?.message || 'get-metrics error', requestId); + } catch (error) { + this.logger.error(`Subscribe error for ${subscriptionKey}:`, error); + this.sendError(client, error.message, requestId); } } - private async handleSubscribeMetric(clientId: string, client: WebSocket, payload: any) { - const { metric, interval = 60000, filters = {} } = payload || {}; + private async updateRealtimeData(subscriptionKey: string) { + const subscription = this.realtimeSubscriptions.get(subscriptionKey); + if (!subscription) return; + + try { + const freshData = await this.prometheusService.fetchMetricsWithFilters( + subscription.metric, + subscription.filters + ); + + if (!this.isDataEqual(subscription.lastData, freshData)) { + subscription.lastData = freshData; + + // Рассылаем обновление всем подписанным клиентам + this.broadcastToClients(Array.from(subscription.clients), { + event: 'realtime-data', + data: { + metric: subscription.metric, + filters: subscription.filters, + data: freshData, + type: 'update' + } + }); + + this.logger.debug(`Data updated for subscription: ${subscriptionKey}`); + } + } catch (error) { + this.logger.error(`Update error for ${subscriptionKey}:`, error); + } + } + + private handleUnsubscribeRealtime( + clientId: string, + payload: any, + requestId?: string + ) { + const { metric, filters = {} } = payload || {}; + if (!metric) { - this.sendError(client, 'Metric name is required'); return; } - const key = this.getSubscriptionKey(metric, filters); + const subscriptionKey = this.getSubscriptionKey(metric, filters); + const subscription = this.realtimeSubscriptions.get(subscriptionKey); + + if (!subscription) { + this.logger.debug(`No subscription found for: ${subscriptionKey}`); + return; + } + + // Удаляем клиента из подписки + subscription.clients.delete(clientId); + this.logger.log(`Client ${clientId} unsubscribed from: ${subscriptionKey}`); + + // Если больше нет клиентов, очищаем подписку + if (subscription.clients.size === 0) { + clearInterval(subscription.interval); + this.realtimeSubscriptions.delete(subscriptionKey); + this.logger.log(`Subscription removed: ${subscriptionKey}`); + } + } + + private handleUnsubscribeAll(clientId: string, requestId?: string) { + let unsubscribedCount = 0; + + for (const [key, subscription] of this.realtimeSubscriptions) { + if (subscription.clients.has(clientId)) { + subscription.clients.delete(clientId); + unsubscribedCount++; + + if (subscription.clients.size === 0) { + clearInterval(subscription.interval); + this.realtimeSubscriptions.delete(key); + } + } + } + + this.logger.log(`Client ${clientId} unsubscribed from ${unsubscribedCount} subscriptions`); + } + + private async handleGetHistorical( + client: WebSocket, + payload: any, + requestId?: string + ) { + const { metric, start, end, step = 60, filters = {} } = payload || {}; + + if (!metric) { + return this.sendError(client, 'Metric name is required', requestId); + } + + if (!start || !end) { + return this.sendError(client, 'Start and end time are required', requestId); + } + + const requestKey = `${requestId || Date.now()}-${metric}`; try { - const initial = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + this.logger.debug(`Fetching historical data for: ${metric}, from ${new Date(start).toISOString()} to ${new Date(end).toISOString()}`); - if (!Array.isArray(initial)) { - throw new Error(`Expected array for metric ${metric}, got ${typeof initial}`); - } - - this.sendMessage(client, { - event: 'metrics-data', - data: { - metric: key, - data: initial, - type: 'initial' - } - }); - - this.lastSentData.set(key, initial); - - const stopUpdates = await this.sendPeriodicUpdates( + const historicalData = await this.prometheusService.fetchMetricsRange( metric, - interval, - (freshData) => { - if (!Array.isArray(freshData)) { - this.logger.error(`Periodic update: expected array for ${key}, got ${typeof freshData}`); - return; - } - - const lastData = this.lastSentData.get(key); - if (!this.isDataEqual(lastData, freshData)) { - this.broadcast({ - event: 'metrics-data', - data: { - metric: key, - data: freshData, - type: 'update' - } - }); - this.lastSentData.set(key, freshData); - } - }, + Math.floor(start / 1000), // Convert to seconds + Math.floor(end / 1000), // Convert to seconds + step, filters ); - this.metricSubscriptions.set(key, { - stopUpdates, - clients: new Set([clientId]), + this.sendMessage(client, { + event: 'historical-data', + data: { + metric, + filters, + data: historicalData, + start, + end, + step + }, + requestId }); - const unsubscribe = () => { - this.logger.log(`Unsubscribing client ${clientId} from ${key}`); - const subscription = this.metricSubscriptions.get(key); - if (subscription) { - subscription.clients.delete(clientId); - if (subscription.clients.size === 0) { - subscription.stopUpdates(); - this.metricSubscriptions.delete(key); - this.lastSentData.delete(key); - } - } - }; - - client.on('close', unsubscribe); - client.on('error', unsubscribe); + this.logger.debug(`Historical data sent for: ${metric}, points: ${historicalData.length}`); } catch (error) { - this.logger.error(`Subscription error for ${key}:`, error); - this.sendError(client, error.message); - - if (!this.metricSubscriptions.has(key)) { - this.lastSentData.delete(key); - } + this.logger.error(`Historical data error for ${metric}:`, error); + this.sendError(client, error.message, requestId); } } - private handleUnsubscribeMetric( - clientId: string, - payload: { metric: string; filters?: Filters } + private async handleGetCurrent( + client: WebSocket, + payload: any, + requestId?: string ) { const { metric, filters = {} } = payload || {}; - if (!metric) return; - const key = this.getSubscriptionKey(metric, filters); - const sub = this.metricSubscriptions.get(key); - if (!sub) return; + if (!metric) { + return this.sendError(client, 'Metric name is required', requestId); + } - sub.clients.delete(clientId); - if (sub.clients.size === 0) { - sub.stopUpdates(); - this.metricSubscriptions.delete(key); - this.lastSentData.delete(key); + try { + const currentData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + + this.sendMessage(client, { + event: 'current-data', + data: { + metric, + filters, + data: currentData, + timestamp: Date.now() + }, + requestId + }); + + } catch (error) { + this.logger.error(`Current data error for ${metric}:`, error); + this.sendError(client, error.message, requestId); } } + private handleClientDisconnect(clientId: string) { + this.logger.log(`Client disconnected: ${clientId}`); - private getQueryParams(rawUrl?: string): Record { + // Удаляем клиента из всех подписок + this.handleUnsubscribeAll(clientId); + + // Удаляем из активных клиентов + this.activeClients.delete(clientId); + + this.logger.debug(`Active clients: ${this.activeClients.size}, Subscriptions: ${this.realtimeSubscriptions.size}`); + } + + private clearAllSubscriptions() { + for (const [key, subscription] of this.realtimeSubscriptions) { + clearInterval(subscription.interval); + } + this.realtimeSubscriptions.clear(); + this.logger.log('All subscriptions cleared'); + } + + private getClientId(url?: string): string { + const params = this.getQueryParams(url); + return params.clientId || `client-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`; + } + + private getQueryParams(url?: string): Record { try { - const url = new URL(rawUrl || '', 'http://localhost'); // безопасная база - return Object.fromEntries(url.searchParams.entries()); + if (!url) return {}; + const urlObj = new URL(url, 'http://localhost'); + return Object.fromEntries(urlObj.searchParams.entries()); } catch { return {}; } } private getSubscriptionKey(metric: string, filters: Filters): string { - const keys = Object.keys(filters).sort(); - const filterString = keys - .map((k) => `${k}=${encodeURIComponent(filters[k])}`) + const sortedFilters = Object.keys(filters) + .sort() + .map(key => `${key}=${encodeURIComponent(filters[key])}`) .join('&'); - return `${metric}${filterString ? `?${filterString}` : ''}`; + + return sortedFilters ? `${metric}?${sortedFilters}` : metric; } - private isDataEqual(a: any[], b: any[]) { - if (!a || !b || a.length !== b.length) return false; - return a.every((item, i) => { - const x = b[i]; + private isDataEqual(a: PrometheusMetric[], b: PrometheusMetric[]): boolean { + if (!Array.isArray(a) || !Array.isArray(b) || a.length !== b.length) { + return false; + } + + return a.every((itemA, index) => { + const itemB = b[index]; return ( - item?.value === x?.value && - item?.status === x?.status && - item?.timestamp === x?.timestamp + itemA.value === itemB.value && + itemA.timestamp === itemB.timestamp && + itemA.device === itemB.device && + itemA.source_id === itemB.source_id ); }); } - private async sendPeriodicUpdates( - metric: string, - interval: number, - cb: (data: any) => void, - filters: Filters - ) { - const initialDelay = Math.floor(Math.random() * 5000); - await new Promise((r) => setTimeout(r, initialDelay)); - - const timer = setInterval(async () => { - try { - const data = await this.prometheusService.fetchMetricsWithFilters( - metric, - filters - ); - cb(data); - } catch (e: any) { - this.logger.error( - `Error in periodic update for ${metric}:`, - e?.message - ); - } - }, interval); - - return () => clearInterval(timer); - } - private sendMessage(client: WebSocket, message: any) { if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify(message)); + try { + client.send(JSON.stringify(message)); + } catch (error) { + this.logger.error('Error sending message to client:', error); + } } } - private broadcast(message: any) { - const raw = JSON.stringify(message); - this.wss.clients.forEach((c) => { - if (c.readyState === WebSocket.OPEN) c.send(raw); + private broadcastToClients(clientIds: string[], message: any) { + const messageStr = JSON.stringify(message); + + clientIds.forEach(clientId => { + const client = this.activeClients.get(clientId); + if (client && client.readyState === WebSocket.OPEN) { + try { + client.send(messageStr); + } catch (error) { + this.logger.error(`Error broadcasting to client ${clientId}:`, error); + } + } }); } private sendError(client: WebSocket, error: string, requestId?: string) { this.sendMessage(client, { - event: 'metrics-error', + event: 'error', data: { error, requestId }, + requestId }); } -} - - +} \ No newline at end of file diff --git a/src/prometheus/prometheus-cache.service.ts b/src/prometheus/prometheus-cache.service.ts new file mode 100644 index 0000000..804603d --- /dev/null +++ b/src/prometheus/prometheus-cache.service.ts @@ -0,0 +1,49 @@ +import { Injectable } from '@nestjs/common'; +import { PrometheusMetric } from './prometheus-metric.interface'; + +interface CacheEntry { + data: T; + timestamp: number; +} + +interface MetadataCacheEntry { + type: string | null; + description: string | undefined; + timestamp: number; +} + +@Injectable() +export class PrometheusCacheService { + private metricCache = new Map>(); + private metadataCache = new Map(); + + getMetricCache(key: string): CacheEntry | undefined { + return this.metricCache.get(key); + } + + setMetricCache(key: string, data: PrometheusMetric[], ttl: number = 5000): void { + this.metricCache.set(key, { data, timestamp: Date.now() + ttl }); + } + + getMetadataCache(key: string): MetadataCacheEntry | undefined { + return this.metadataCache.get(key); + } + + setMetadataCache( + key: string, + type: string | null, + description: string | undefined, + ttl: number = 30000 + ): void { + this.metadataCache.set(key, { type, description, timestamp: Date.now() + ttl }); + } + + isCacheValid(cacheEntry: { timestamp: number }): boolean { + return Date.now() < cacheEntry.timestamp; + } + + clearCache(): void { + this.metricCache.clear(); + this.metadataCache.clear(); + } +} \ No newline at end of file diff --git a/src/prometheus/prometheus-query.service.ts b/src/prometheus/prometheus-query.service.ts new file mode 100644 index 0000000..0e91855 --- /dev/null +++ b/src/prometheus/prometheus-query.service.ts @@ -0,0 +1,27 @@ +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class PrometheusQueryService { + buildFilteredQuery(metric: string, filters: Record): string { + const filterParts = Object.entries(filters) + .filter(([_, value]) => value !== undefined && value !== null && value !== "") + .map(([key, value]) => `${key}="${value}"`); + + return filterParts.length > 0 + ? `${metric}{${filterParts.join(',')}}` + : metric; + } + + calculateOptimalStep(start: number, end: number): number { + const duration = end - start; + return Math.max(Math.floor(duration / 1000), 15); + } + + generateCacheKey(metric: string, filters: Record = {}): string { + return `${metric}:${JSON.stringify(filters)}`; + } + + generateMetadataCacheKey(metric: string, type: 'type' | 'description'): string { + return `metadata-${type}-${metric}`; + } +} \ No newline at end of file diff --git a/src/prometheus/prometheus.module.ts b/src/prometheus/prometheus.module.ts index 994b741..5ee8479 100644 --- a/src/prometheus/prometheus.module.ts +++ b/src/prometheus/prometheus.module.ts @@ -1,13 +1,21 @@ import { Module } from '@nestjs/common'; import { HttpModule } from '@nestjs/axios'; +import { ConfigModule } from '@nestjs/config'; import { PrometheusService } from './prometheus.service'; import { MetricsController } from './metrics.controller'; import { MetricsGateway } from './metrics.gateway'; +import { PrometheusCacheService } from './prometheus-cache.service'; +import { PrometheusQueryService } from './prometheus-query.service'; @Module({ - imports: [HttpModule], - providers: [PrometheusService, MetricsGateway], - controllers: [MetricsController], - exports: [PrometheusService] - }) - export class PrometheusModule {} \ No newline at end of file + imports: [HttpModule, ConfigModule], + providers: [ + PrometheusCacheService, + PrometheusQueryService, + PrometheusService, + MetricsGateway + ], + controllers: [MetricsController], + exports: [PrometheusService] +}) +export class PrometheusModule { } \ No newline at end of file diff --git a/src/prometheus/prometheus.service.ts b/src/prometheus/prometheus.service.ts index 396e388..08876c6 100644 --- a/src/prometheus/prometheus.service.ts +++ b/src/prometheus/prometheus.service.ts @@ -1,47 +1,371 @@ +// import { Injectable } from '@nestjs/common'; +// import { HttpService } from '@nestjs/axios'; +// import { ConfigService } from '@nestjs/config'; +// import { lastValueFrom } from 'rxjs'; +// import { PrometheusMetric } from './prometheus-metric.interface'; +// import { MenuItem } from '../menu/menu.interface'; + +// @Injectable() +// export class PrometheusService { +// private readonly prometheusUrl: string; +// private metricCache = new Map(); +// private metadataCache = new Map(); + +// constructor( +// private readonly httpService: HttpService, +// private readonly configService: ConfigService +// ) { +// this.prometheusUrl = this.configService.get('PROMETHEUS_API', 'http://localhost:9090'); +// console.log('Prometheus API URL:', this.prometheusUrl); +// } + +// async fetchMetricType(metric: string): Promise { +// const cacheKey = `metadata-type-${metric}`; +// const cacheEntry = this.metadataCache.get(cacheKey); + +// if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) { +// return cacheEntry.type; +// } + +// try { +// const response = await lastValueFrom( +// this.httpService.get(`${this.prometheusUrl}/metadata`, { +// params: { metric }, +// }) +// ); +// const metadata = response.data.data[metric]; +// const result = metadata?.length ? metadata[0].type : null; + +// this.metadataCache.set(cacheKey, { +// type: result, +// description: cacheEntry?.description, +// timestamp: Date.now() +// }); + +// return result; +// } catch (error) { +// console.error(`Ошибка при получении типа метрики ${metric}:`, error); +// return cacheEntry?.type || null; +// } +// } + +// async fetchMetricDescription(metric: string): Promise { +// const cacheKey = `metadata-description-${metric}`; +// const cacheEntry = this.metadataCache.get(cacheKey); + +// if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) { +// return cacheEntry.description; +// } + +// try { +// const response = await lastValueFrom( +// this.httpService.get(`${this.prometheusUrl}/metadata`, { +// params: { metric }, +// }) +// ); +// const metadata = response.data.data[metric]; +// const result = metadata?.length ? metadata[0].help : undefined; + +// this.metadataCache.set(cacheKey, { +// type: cacheEntry?.type ?? null, +// description: result, +// timestamp: Date.now() +// }); + +// return result; +// } catch (error) { +// console.error(`Ошибка при получении описания метрики ${metric}:`, error); +// return cacheEntry?.description; +// } +// } + +// async fetchMetrics(metric: string): Promise { +// const cacheKey = `${metric}:{}`; +// const cacheEntry = this.metricCache.get(cacheKey); + +// if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) { +// return cacheEntry.data; +// } + +// try { +// const response = await lastValueFrom( +// this.httpService.get(`${this.prometheusUrl}/query`, { +// params: { query: metric }, +// }) +// ); + +// const metricType = await this.fetchMetricType(metric); +// const metricDescription = await this.fetchMetricDescription(metric); + +// const result = response.data.data.result.map((entry): PrometheusMetric => ({ +// __name__: entry.metric.__name__ || metric, +// device: entry.metric.device, +// instance: entry.metric.instance, +// job: entry.metric.job, +// source_id: entry.metric.source_id, +// status: entry.metric.status || '0', +// timestamp: entry.value[0] * 1000, +// value: parseFloat(entry.value[1]), +// type: metricType || 'gauge', +// description: metricDescription, +// ...entry.metric +// })); + +// this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() }); +// return result; +// } catch (error) { +// console.error(`Error fetching metrics for ${metric}:`, error); +// if (cacheEntry) return cacheEntry.data; +// throw error; +// } +// } + +// async fetchMetricsWithFilters(metric: string, filters: Record): Promise { +// const cacheKey = `${metric}:${JSON.stringify(filters)}`; +// const cacheEntry = this.metricCache.get(cacheKey); + +// if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) { +// return cacheEntry.data; +// } + +// try { +// const query = this.buildFilteredQuery(metric, filters); +// const response = await lastValueFrom( +// this.httpService.get(`${this.prometheusUrl}/query`, { +// params: { query } +// }) +// ); + +// const metricType = await this.fetchMetricType(metric); +// const metricDescription = await this.fetchMetricDescription(metric); + +// const result = response.data.data.result.map((entry): PrometheusMetric => ({ +// __name__: entry.metric.__name__ || metric, +// device: entry.metric.device, +// instance: entry.metric.instance, +// job: entry.metric.job, +// source_id: entry.metric.source_id, +// status: entry.metric.status || '0', +// timestamp: entry.value[0] * 1000, +// value: parseFloat(entry.value[1]), +// type: metricType || 'gauge', +// description: metricDescription, +// ...entry.metric +// })); + +// this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() }); +// return result; +// } catch (error) { +// console.error(`Error fetching metrics with filters for ${metric}:`, error); +// if (cacheEntry) return cacheEntry.data; +// throw error; +// } +// } + +// private buildFilteredQuery(metric: string, filters: Record): string { +// const filterParts = Object.entries(filters) +// .filter(([_, value]) => value !== undefined && value !== null && value !== "") +// .map(([key, value]) => { +// return `${key}="${value}"`; +// }); + +// return filterParts.length > 0 +// ? `${metric}{${filterParts.join(',')}}` +// : metric; +// } + +// async fetchMetricsRange(metric: string, start: number, end: number, step: number, filters: Record = {}): Promise { +// // Рассчитываем оптимальный шаг, если не указан +// const duration = end - start; +// const optimalStep = Math.max(Math.floor(duration / 1000), 15); // Минимум 15 секунд + +// const query = this.buildFilteredQuery(metric, { +// ...filters, +// instance: '192.168.2.34:9050' +// }); + +// try { +// const response = await lastValueFrom( +// this.httpService.get(`${this.prometheusUrl}/query_range`, { +// params: { +// query, +// start, +// end, +// step: optimalStep.toString() +// }, +// }) +// ); + +// const metricType = await this.fetchMetricType(metric); +// const metricDescription = await this.fetchMetricDescription(metric); + +// return response.data.data.result.flatMap((entry) => +// entry.values.map((value): PrometheusMetric => ({ +// __name__: entry.metric.__name__ || metric, +// device: entry.metric.device, +// instance: entry.metric.instance, +// job: entry.metric.job, +// source_id: entry.metric.source_id, +// status: entry.metric.status || '0', +// timestamp: value[0] * 1000, +// value: parseFloat(value[1]), +// type: metricType || 'gauge', +// description: metricDescription, +// ...entry.metric +// })) +// ); +// } catch (error) { +// console.error('Error in fetchMetricsRange:', { +// error: error.response?.data || error.message, +// query, +// filters +// }); +// throw error; +// } +// } + +// async getMetricsForMenuItem(menuItem: MenuItem): Promise { +// if (!menuItem.metric || !menuItem.filters) { +// throw new Error('MenuItem is not a metric item'); +// } + +// return this.fetchMetricsWithFilters(menuItem.metric, menuItem.filters); +// } + +// async fetchMetricMetadata(metric: string): Promise<{ +// name: string; +// help?: string; +// type?: string; +// }> { +// try { +// const response = await lastValueFrom( +// this.httpService.get(`${this.prometheusUrl}/metadata`, { +// params: { metric } +// }) +// ); + +// const data = response.data?.data?.[metric]?.[0]; + +// return { +// name: metric, +// help: data?.help, +// type: data?.type +// }; +// } catch (error) { +// console.error(`Error fetching metadata for ${metric}:`, error); +// return { +// name: metric +// }; +// } +// } + +// async fetchMetricSeries(metric: string): Promise[]> { +// try { +// const response = await lastValueFrom( +// this.httpService.get(`${this.prometheusUrl}/series`, { +// params: { 'match[]': metric } +// }) +// ); + +// return response.data.data || []; +// } catch (error) { +// console.error(`Error fetching series for ${metric}:`, error); +// return []; +// } +// } + +// async fetchAllMetrics(): Promise { +// try { +// const response = await lastValueFrom( +// this.httpService.get(`${this.prometheusUrl}/label/__name__/values`) +// ); +// return response.data.data; +// } catch (error) { +// console.error('Error fetching all metrics:', error); +// return []; +// } +// } + +// async fetchAllMetricsWithValues(): Promise { +// const metricNames = await this.fetchAllMetrics(); +// const zvksMetrics = metricNames.filter(metric => +// metric.startsWith('zvks') || +// metric.includes('server_li') || +// metric.includes('application_li') +// ); + +// const promises = zvksMetrics.map(async (metric) => { +// try { +// const data = await this.fetchMetrics(metric); +// return { metric, data }; +// } catch (error) { +// console.error(`Error fetching data for metric ${metric}:`, error); +// return { metric, data: [] }; +// } +// }); + +// return Promise.all(promises); +// } + +// clearCache(): void { +// this.metricCache.clear(); +// this.metadataCache.clear(); +// } +// } + + import { Injectable } from '@nestjs/common'; import { HttpService } from '@nestjs/axios'; import { ConfigService } from '@nestjs/config'; import { lastValueFrom } from 'rxjs'; -import { PrometheusMetric } from './prometheus-metric.interface'; import { MenuItem } from '../menu/menu.interface'; +import { PrometheusMetric } from './prometheus-metric.interface'; +import { PrometheusCacheService } from './prometheus-cache.service'; +import { PrometheusQueryService } from './prometheus-query.service'; + +interface PrometheusResponse { + status: string; + data: any; +} @Injectable() export class PrometheusService { private readonly prometheusUrl: string; - private metricCache = new Map(); - private metadataCache = new Map(); constructor( private readonly httpService: HttpService, - private readonly configService: ConfigService + private readonly configService: ConfigService, + private readonly cacheService: PrometheusCacheService, + private readonly queryService: PrometheusQueryService ) { this.prometheusUrl = this.configService.get('PROMETHEUS_API', 'http://localhost:9090'); console.log('Prometheus API URL:', this.prometheusUrl); } + private async executeQuery(url: string, params: any): Promise { + const response = await lastValueFrom( + this.httpService.get(url, { params }) + ); + return response.data; + } + async fetchMetricType(metric: string): Promise { - const cacheKey = `metadata-type-${metric}`; - const cacheEntry = this.metadataCache.get(cacheKey); - - if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) { + const cacheKey = this.queryService.generateMetadataCacheKey(metric, 'type'); + const cacheEntry = this.cacheService.getMetadataCache(cacheKey); + + if (cacheEntry && this.cacheService.isCacheValid(cacheEntry)) { return cacheEntry.type; } try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/metadata`, { - params: { metric }, - }) - ); - const metadata = response.data.data[metric]; - const result = metadata?.length ? metadata[0].type : null; - - this.metadataCache.set(cacheKey, { - type: result, - description: cacheEntry?.description, - timestamp: Date.now() + const data = await this.executeQuery(`${this.prometheusUrl}/metadata`, { + params: { metric }, }); - + + const metadata = data.data[metric]; + const result = metadata?.length ? metadata[0].type : null; + + this.cacheService.setMetadataCache(cacheKey, result, cacheEntry?.description); return result; } catch (error) { console.error(`Ошибка при получении типа метрики ${metric}:`, error); @@ -50,28 +374,22 @@ export class PrometheusService { } async fetchMetricDescription(metric: string): Promise { - const cacheKey = `metadata-description-${metric}`; - const cacheEntry = this.metadataCache.get(cacheKey); - - if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) { + const cacheKey = this.queryService.generateMetadataCacheKey(metric, 'description'); + const cacheEntry = this.cacheService.getMetadataCache(cacheKey); + + if (cacheEntry && this.cacheService.isCacheValid(cacheEntry)) { return cacheEntry.description; } try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/metadata`, { - params: { metric }, - }) - ); - const metadata = response.data.data[metric]; + const data = await this.executeQuery(`${this.prometheusUrl}/metadata`, { + params: { metric }, + }); + + const metadata = data.data[metric]; const result = metadata?.length ? metadata[0].help : undefined; - - this.metadataCache.set(cacheKey, { - type: cacheEntry?.type ?? null, - description: result, - timestamp: Date.now() - }); - + + this.cacheService.setMetadataCache(cacheKey, cacheEntry?.type ?? null, result); return result; } catch (error) { console.error(`Ошибка при получении описания метрики ${metric}:`, error); @@ -79,39 +397,46 @@ export class PrometheusService { } } + private transformMetricData( + entry: any, + metric: string, + type: string | null, + description: string | undefined + ): PrometheusMetric { + return { + __name__: entry.metric.__name__ || metric, + device: entry.metric.device || '', + source_id: entry.metric.source_id || '', + value: parseFloat(entry.value[1]), + timestamp: entry.value[0] * 1000, + type: type || 'gauge', + description + }; + } + async fetchMetrics(metric: string): Promise { - const cacheKey = `${metric}:{}`; - const cacheEntry = this.metricCache.get(cacheKey); - - if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) { + const cacheKey = this.queryService.generateCacheKey(metric); + const cacheEntry = this.cacheService.getMetricCache(cacheKey); + + if (cacheEntry && this.cacheService.isCacheValid(cacheEntry)) { return cacheEntry.data; } try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/query`, { - params: { query: metric }, - }) + const data = await this.executeQuery(`${this.prometheusUrl}/query`, { + query: metric + }); + + const [type, description] = await Promise.all([ + this.fetchMetricType(metric), + this.fetchMetricDescription(metric) + ]); + + const result = data.data.result.map((entry: any) => + this.transformMetricData(entry, metric, type, description) ); - const metricType = await this.fetchMetricType(metric); - const metricDescription = await this.fetchMetricDescription(metric); - - const result = response.data.data.result.map((entry): PrometheusMetric => ({ - __name__: entry.metric.__name__ || metric, - device: entry.metric.device, - instance: entry.metric.instance, - job: entry.metric.job, - source_id: entry.metric.source_id, - status: entry.metric.status || '0', - timestamp: entry.value[0] * 1000, - value: parseFloat(entry.value[1]), - type: metricType || 'gauge', - description: metricDescription, - ...entry.metric - })); - - this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() }); + this.cacheService.setMetricCache(cacheKey, result); return result; } catch (error) { console.error(`Error fetching metrics for ${metric}:`, error); @@ -121,39 +446,27 @@ export class PrometheusService { } async fetchMetricsWithFilters(metric: string, filters: Record): Promise { - const cacheKey = `${metric}:${JSON.stringify(filters)}`; - const cacheEntry = this.metricCache.get(cacheKey); - - if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) { + const cacheKey = this.queryService.generateCacheKey(metric, filters); + const cacheEntry = this.cacheService.getMetricCache(cacheKey); + + if (cacheEntry && this.cacheService.isCacheValid(cacheEntry)) { return cacheEntry.data; } try { - const query = this.buildFilteredQuery(metric, filters); - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/query`, { - params: { query } - }) + const query = this.queryService.buildFilteredQuery(metric, filters); + const data = await this.executeQuery(`${this.prometheusUrl}/query`, { query }); + + const [type, description] = await Promise.all([ + this.fetchMetricType(metric), + this.fetchMetricDescription(metric) + ]); + + const result = data.data.result.map((entry: any) => + this.transformMetricData(entry, metric, type, description) ); - const metricType = await this.fetchMetricType(metric); - const metricDescription = await this.fetchMetricDescription(metric); - - const result = response.data.data.result.map((entry): PrometheusMetric => ({ - __name__: entry.metric.__name__ || metric, - device: entry.metric.device, - instance: entry.metric.instance, - job: entry.metric.job, - source_id: entry.metric.source_id, - status: entry.metric.status || '0', - timestamp: entry.value[0] * 1000, - value: parseFloat(entry.value[1]), - type: metricType || 'gauge', - description: metricDescription, - ...entry.metric - })); - - this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() }); + this.cacheService.setMetricCache(cacheKey, result); return result; } catch (error) { console.error(`Error fetching metrics with filters for ${metric}:`, error); @@ -162,56 +475,42 @@ export class PrometheusService { } } - private buildFilteredQuery(metric: string, filters: Record): string { - const filterParts = Object.entries(filters) - .filter(([_, value]) => value !== undefined && value !== null && value !== "") - .map(([key, value]) => { - return `${key}="${value}"`; - }); - - return filterParts.length > 0 - ? `${metric}{${filterParts.join(',')}}` - : metric; - } - - async fetchMetricsRange(metric: string, start: number, end: number, step: number, filters: Record = {}): Promise { - // Рассчитываем оптимальный шаг, если не указан - const duration = end - start; - const optimalStep = Math.max(Math.floor(duration / 1000), 15); // Минимум 15 секунд - - const query = this.buildFilteredQuery(metric, { + async fetchMetricsRange( + metric: string, + start: number, + end: number, + step: number, + filters: Record = {} + ): Promise { + const query = this.queryService.buildFilteredQuery(metric, { ...filters, instance: '192.168.2.34:9050' }); + const optimalStep = this.queryService.calculateOptimalStep(start, end); + try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/query_range`, { - params: { - query, - start, - end, - step: optimalStep.toString() - }, - }) - ); + const data = await this.executeQuery(`${this.prometheusUrl}/query_range`, { + query, + start, + end, + step: optimalStep.toString() + }); - const metricType = await this.fetchMetricType(metric); - const metricDescription = await this.fetchMetricDescription(metric); + const [type, description] = await Promise.all([ + this.fetchMetricType(metric), + this.fetchMetricDescription(metric) + ]); - return response.data.data.result.flatMap((entry) => - entry.values.map((value): PrometheusMetric => ({ + return data.data.result.flatMap((entry: any) => + entry.values.map((value: any) => ({ __name__: entry.metric.__name__ || metric, - device: entry.metric.device, - instance: entry.metric.instance, - job: entry.metric.job, - source_id: entry.metric.source_id, - status: entry.metric.status || '0', - timestamp: value[0] * 1000, + device: entry.metric.device || '', + source_id: entry.metric.source_id || '', value: parseFloat(value[1]), - type: metricType || 'gauge', - description: metricDescription, - ...entry.metric + timestamp: value[0] * 1000, + type: type || 'gauge', + description })) ); } catch (error) { @@ -238,36 +537,30 @@ export class PrometheusService { type?: string; }> { try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/metadata`, { - params: { metric } - }) - ); + const data = await this.executeQuery(`${this.prometheusUrl}/metadata`, { + params: { metric } + }); - const data = response.data?.data?.[metric]?.[0]; + const metadata = data?.data?.[metric]?.[0]; return { name: metric, - help: data?.help, - type: data?.type + help: metadata?.help, + type: metadata?.type }; } catch (error) { console.error(`Error fetching metadata for ${metric}:`, error); - return { - name: metric - }; + return { name: metric }; } } async fetchMetricSeries(metric: string): Promise[]> { try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/series`, { - params: { 'match[]': metric } - }) - ); + const data = await this.executeQuery(`${this.prometheusUrl}/series`, { + 'match[]': metric + }); - return response.data.data || []; + return data.data || []; } catch (error) { console.error(`Error fetching series for ${metric}:`, error); return []; @@ -276,22 +569,20 @@ export class PrometheusService { async fetchAllMetrics(): Promise { try { - const response = await lastValueFrom( - this.httpService.get(`${this.prometheusUrl}/label/__name__/values`) - ); - return response.data.data; + const data = await this.executeQuery(`${this.prometheusUrl}/label/__name__/values`, {}); + return data.data; } catch (error) { console.error('Error fetching all metrics:', error); return []; } } - async fetchAllMetricsWithValues(): Promise { + async fetchAllMetricsWithValues(): Promise> { const metricNames = await this.fetchAllMetrics(); - const zvksMetrics = metricNames.filter(metric => - metric.startsWith('zvks') || - metric.includes('server_li') || - metric.includes('application_li') + const zvksMetrics = metricNames.filter(metric => + metric.startsWith('zvks') || + metric.includes('server_li') || + metric.includes('application_li') ); const promises = zvksMetrics.map(async (metric) => { @@ -308,7 +599,6 @@ export class PrometheusService { } clearCache(): void { - this.metricCache.clear(); - this.metadataCache.clear(); + this.cacheService.clearCache(); } } \ No newline at end of file