diff --git a/.env b/.env index dd678cd..5ffdcb0 100644 --- a/.env +++ b/.env @@ -1,7 +1,7 @@ # Прометеус PROMETHEUS_API=http://192.168.2.34:9090/api/v1 -FRONTEND_URL=localhost:5173 +FRONTEND_URL=http://localhost:5173 # Постгресс DB_HOST=192.168.2.37 diff --git a/package-lock.json b/package-lock.json index ed920a1..6e3c709 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36,7 +36,8 @@ "reflect-metadata": "^0.2.2", "rxjs": "^7.8.1", "socket.io": "^4.8.1", - "typeorm": "^0.3.21" + "typeorm": "^0.3.21", + "ws": "^8.18.3" }, "devDependencies": { "@eslint/eslintrc": "^3.2.0", @@ -5956,6 +5957,27 @@ "node": ">= 0.6" } }, + "node_modules/engine.io/node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/enhanced-resolve": { "version": "5.18.1", "resolved": "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.18.1.tgz", @@ -10984,6 +11006,27 @@ } } }, + "node_modules/socket.io-adapter/node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/socket.io-parser": { "version": "4.2.4", "resolved": "https://registry.npmmirror.com/socket.io-parser/-/socket.io-parser-4.2.4.tgz", @@ -12918,9 +12961,9 @@ "license": "ISC" }, "node_modules/ws": { - "version": "8.17.1", - "resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz", - "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "version": "8.18.3", + "resolved": "https://registry.npmmirror.com/ws/-/ws-8.18.3.tgz", + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", "license": "MIT", "engines": { "node": ">=10.0.0" diff --git a/package.json b/package.json index 05bfc84..cbeb34f 100644 --- a/package.json +++ b/package.json @@ -20,34 +20,35 @@ "test:e2e": "jest --config ./test/jest-e2e.json" }, "dependencies": { + "@clickhouse/client": "^1.11.2", + "@clickhouse/client-web": "^1.11.2", "@nestjs/axios": "^4.0.0", "@nestjs/common": "^11.0.1", - "@nestjs/core": "^11.0.1", "@nestjs/config": "^4.0.0", - "@nestjs/platform-express": "^11.0.1", - "axios": "^1.7.9", - "reflect-metadata": "^0.2.2", - "dotenv": "^16.3.1", - "rxjs": "^7.8.1", - "@nestjs/typeorm": "^11.0.0", - "pg": "^8.14.1", - "typeorm": "^0.3.21", - "bcrypt": "^5.1.1", - "@types/bcrypt": "^5.0.2", - "socket.io": "^4.8.1", - "@nestjs/websockets": "11.0.12", - "@nestjs/platform-socket.io": "11.0.12", - "passport": "^0.7.0", - "passport-jwt": "^4.0.1", - "cookie-parser": "^1.4.7", - "@types/passport-jwt": "^4.0.1", - "@types/cookie-parser": "^1.4.8", + "@nestjs/core": "^11.0.1", "@nestjs/jwt": "^11.0.0", "@nestjs/passport": "^11.0.5", + "@nestjs/platform-express": "^11.0.1", + "@nestjs/platform-socket.io": "11.0.12", "@nestjs/swagger": "11.1.4", - "@clickhouse/client": "^1.11.2", + "@nestjs/typeorm": "^11.0.0", + "@nestjs/websockets": "11.0.12", + "@types/bcrypt": "^5.0.2", + "@types/cookie-parser": "^1.4.8", + "@types/passport-jwt": "^4.0.1", + "axios": "^1.7.9", + "bcrypt": "^5.1.1", + "cookie-parser": "^1.4.7", "date-fns": "4.1.0", - "@clickhouse/client-web": "^1.11.2" + "dotenv": "^16.3.1", + "passport": "^0.7.0", + "passport-jwt": "^4.0.1", + "pg": "^8.14.1", + "reflect-metadata": "^0.2.2", + "rxjs": "^7.8.1", + "socket.io": "^4.8.1", + "typeorm": "^0.3.21", + "ws": "^8.18.3" }, "devDependencies": { "@eslint/eslintrc": "^3.2.0", @@ -93,4 +94,4 @@ "coverageDirectory": "../coverage", "testEnvironment": "node" } -} \ No newline at end of file +} diff --git a/src/prometheus/metrics.gateway.ts b/src/prometheus/metrics.gateway.ts index 9e76e23..366b7c7 100644 --- a/src/prometheus/metrics.gateway.ts +++ b/src/prometheus/metrics.gateway.ts @@ -1,253 +1,391 @@ -import { - WebSocketGateway, - WebSocketServer, - OnGatewayInit, - OnGatewayConnection, - OnGatewayDisconnect, - SubscribeMessage, -} from '@nestjs/websockets'; -import { Server, Socket } from 'socket.io'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Server, WebSocket } from 'ws'; +import { createServer } from 'http'; import { PrometheusService } from './prometheus.service'; -import { Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; -@WebSocketGateway({ - cors: { - origin: process.env.FRONTEND_URL, - methods: ['GET', 'POST'], - credentials: true - }, - namespace: '/api/metrics-ws' -}) -export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { - @WebSocketServer() server: Server; +type Filters = Record; + +@Injectable() +export class MetricsGateway implements OnModuleInit { private readonly logger = new Logger(MetricsGateway.name); - private activeSockets: Map = new Map(); - private metricSubscriptions = new Map void; - clients: Set; - }>(); - private lastSentData = new Map(); // Кэш последних отправленных данных + private wss: Server; - constructor(private readonly prometheusService: PrometheusService) { } + private activeSockets = new Map(); + private metricSubscriptions = new Map< + string, + { stopUpdates: () => void; clients: Set } + >(); - afterInit(server: Server) { - this.logger.log('WebSocket Gateway initialized'); + private lastSentData = new Map(); + + constructor( + private readonly prometheusService: PrometheusService, + private readonly configService: ConfigService + ) { } + + onModuleInit() { + const httpServer = createServer(); + this.wss = new Server({ + server: httpServer, + path: '/metrics-ws', + }); + + this.wss.on('connection', (client, request) => + this.handleConnection(client, request) + ); + this.wss.on('error', (err) => + this.logger.error('WebSocket server error:', err) + ); + + const wsPort = Number(this.configService.get('WS_PORT') || 3001); + httpServer.listen(wsPort, () => { + this.logger.log( + `WebSocket server running at ws://localhost:${wsPort}/api/metrics-ws` + ); + }); } - handleConnection(client: Socket) { - this.logger.log(`Client connected: ${client.id}`); - this.activeSockets.set(client.id, client); + + private handleConnection(client: WebSocket, request: any) { + let clientId = + this.getQueryParams(request?.url).clientId || + Math.random().toString(36).slice(2); + + this.activeSockets.set(clientId, client); + this.logger.log(`Client connected: ${clientId}`); + + client.on('message', (raw) => { + try { + const msg = JSON.parse(raw.toString()); + this.handleMessage(clientId, client, msg); + } catch (err) { + this.sendError(client, 'Invalid JSON'); + } + }); + + client.on('close', () => this.cleanupClient(clientId)); + client.on('error', (err) => { + this.logger.error(`Client ${clientId} error:`, err); + this.cleanupClient(clientId); + }); } - handleDisconnect(client: Socket) { - this.logger.log(`Client disconnected: ${client.id}`); - this.activeSockets.delete(client.id); + private handleMessage(clientId: string, client: WebSocket, message: any) { + const { event, data } = message || {}; + if (!event) return this.sendError(client, 'Event type is required'); - // Очистка всех подписок этого клиента - for (const [metric, subscription] of this.metricSubscriptions) { - subscription.clients.delete(client.id); - if (subscription.clients.size === 0) { - subscription.stopUpdates(); - this.metricSubscriptions.delete(metric); - this.lastSentData.delete(metric); + switch (event) { + case 'unsubscribe-all': + return this.unsubscribeAllForClient(clientId); + + case 'get-metrics': + return this.handleGetMetrics(client, data); + + case 'subscribe-metric': + return this.handleSubscribeMetric(clientId, client, data); + + case 'unsubscribe-metric': + return this.handleUnsubscribeMetric(clientId, data); + + default: + return this.sendError(client, `Unknown event type: ${event}`); + } + } + + private cleanupClient(clientId: string) { + if (!this.activeSockets.has(clientId)) return; + this.logger.log(`Client disconnected: ${clientId}`); + this.activeSockets.delete(clientId); + + setTimeout(() => { + if (!this.activeSockets.has(clientId)) { + } + }, 5000); + + for (const [key, sub] of this.metricSubscriptions) { + sub.clients.delete(clientId); + if (sub.clients.size === 0) { + sub.stopUpdates(); + this.metricSubscriptions.delete(key); + this.lastSentData.delete(key); } } } - @SubscribeMessage('unsubscribe-all') - handleUnsubscribeAll(client: Socket) { - for (const [metric, subscription] of this.metricSubscriptions) { - subscription.clients.delete(client.id); - if (subscription.clients.size === 0) { - subscription.stopUpdates(); - this.metricSubscriptions.delete(metric); - this.lastSentData.delete(metric); + private unsubscribeAllForClient(clientId: string) { + for (const [key, sub] of this.metricSubscriptions) { + if (sub.clients.has(clientId)) sub.clients.delete(clientId); + if (sub.clients.size === 0) { + sub.stopUpdates(); + this.metricSubscriptions.delete(key); + this.lastSentData.delete(key); } } } - @SubscribeMessage('get-metrics') - async handleGetMetrics(client: Socket, payload: any) { - const { metric, start, end, step, isRangeQuery, requestId, filters = {} } = payload; + + private async handleGetMetrics(client: WebSocket, payload: any) { + const { + metric, + start, + end, + step, + isRangeQuery, + requestId, + filters = {}, + } = payload || {}; if (!metric) { - client.emit('metrics-error', { - error: 'Metric name is required', - requestId - }); - return; + return this.sendError(client, 'Metric name is required', requestId); } if (isRangeQuery) { try { - const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step, filters); - client.emit('metrics-data', { metric, data, requestId }); - return; - } catch (error) { - client.emit('metrics-error', { - error: error.message, - requestId + const rangeData = await this.prometheusService.fetchMetricsRange( + metric, + start, + end, + step, + filters + ); + this.logger.debug('RangeQuery result', JSON.stringify(rangeData).slice(0, 200)); + + return this.sendMessage(client, { + event: 'metrics-data', + data: rangeData, + metric, + requestId, }); - return; + } catch (err: any) { + return this.sendError(client, err?.message || 'Range query error', requestId); } } try { const subscriptionKey = this.getSubscriptionKey(metric, filters); - // Отправляем текущие данные сразу при запросе - const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); - client.emit('metrics-data', { metric, data: initialData, requestId }); - this.lastSentData.set(subscriptionKey, initialData); - const stopUpdates = await this.sendPeriodicUpdates( - metric, - step || 5000, - (data) => { - const lastData = this.lastSentData.get(subscriptionKey); - if (!this.isDataEqual(lastData, data)) { - client.emit('metrics-data', { metric, data, requestId }); - this.lastSentData.set(subscriptionKey, data); - } - }, - filters - ); + const initialData = + await this.prometheusService.fetchMetricsWithFilters(metric, filters); - const cleanup = () => { - stopUpdates(); - this.lastSentData.delete(subscriptionKey); - client.off('disconnect', cleanup); - client.off('unsubscribe-metric', cleanup); - }; - - client.on('disconnect', cleanup); - client.on('unsubscribe-metric', cleanup); - - } catch (error) { - client.emit('metrics-error', { - error: error.message, - requestId + this.sendMessage(client, { + event: 'metrics-data', + data: { metric, data: initialData, requestId }, }); + + let lastLocal = initialData; + + const jitter = Math.floor(Math.random() * 5000); + setTimeout(() => { + const timer = setInterval(async () => { + try { + const fresh = + await this.prometheusService.fetchMetricsWithFilters( + metric, + filters + ); + if (!this.isDataEqual(lastLocal, fresh)) { + this.sendMessage(client, { + event: 'metrics-data', + data: { metric, data: fresh, requestId }, + }); + lastLocal = fresh; + } + } catch (e) { + this.logger.error( + `Error in on-demand periodic update for ${subscriptionKey}:`, + (e as any)?.message + ); + } + }, step || 5000); + + const closeOnce = () => clearInterval(timer); + client.once('close', closeOnce); + client.once('error', closeOnce); + }, jitter); + } catch (err: any) { + return this.sendError(client, err?.message || 'get-metrics error', requestId); } } - private getSubscriptionKey(metric: string, filters: Record): string { - const filterKeys = Object.keys(filters).sort(); - const filterString = filterKeys.map(k => `${k}=${encodeURIComponent(filters[k])}`).join('&'); - return `${metric}${filterString ? `?${filterString}` : ''}`; - } - - // Сравниваем данные, чтобы избежать лишних отправок - private isDataEqual(oldData: any[], newData: any[]): boolean { - if (!oldData || !newData || oldData.length !== newData.length) return false; - - return oldData.every((oldItem, index) => { - const newItem = newData[index]; - return ( - oldItem.value === newItem.value && - oldItem.status === newItem.status && - oldItem.timestamp === newItem.timestamp - ); - }); - } - - @SubscribeMessage('subscribe-metric') - async handleSubscribeMetric( - client: Socket, - payload: { - metric: string; - interval?: number; - filters?: Record; + private async handleSubscribeMetric(clientId: string, client: WebSocket, payload: any) { + const { metric, interval = 60000, filters = {} } = payload || {}; + if (!metric) { + this.sendError(client, 'Metric name is required'); + return; } - ) { - const { metric, interval = 60000, filters = {} } = payload; // По умолчанию 60 секунд - const subscriptionKey = this.getSubscriptionKey(metric, filters); - if (!this.metricSubscriptions.has(subscriptionKey)) { - // Отправляем текущие данные сразу при подписке - try { - const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); - client.emit('metrics-data', { - metric: subscriptionKey, - data: initialData - }); - this.lastSentData.set(subscriptionKey, initialData); - } catch (error) { - this.logger.error(`Error fetching initial data for ${metric}:`, error.message); + const key = this.getSubscriptionKey(metric, filters); + + try { + const initial = await this.prometheusService.fetchMetricsWithFilters(metric, filters); + + if (!Array.isArray(initial)) { + throw new Error(`Expected array for metric ${metric}, got ${typeof initial}`); } + this.sendMessage(client, { + event: 'metrics-data', + data: { + metric: key, + data: initial, + type: 'initial' + } + }); + + this.lastSentData.set(key, initial); + const stopUpdates = await this.sendPeriodicUpdates( metric, interval, - (data) => { - // Отправляем только если данные изменились - const lastData = this.lastSentData.get(subscriptionKey); - if (!this.isDataEqual(lastData, data)) { - this.server.emit('metrics-data', { - metric: subscriptionKey, - data + (freshData) => { + if (!Array.isArray(freshData)) { + this.logger.error(`Periodic update: expected array for ${key}, got ${typeof freshData}`); + return; + } + + const lastData = this.lastSentData.get(key); + if (!this.isDataEqual(lastData, freshData)) { + this.broadcast({ + event: 'metrics-data', + data: { + metric: key, + data: freshData, + type: 'update' + } }); - this.lastSentData.set(subscriptionKey, data); + this.lastSentData.set(key, freshData); } }, filters ); - this.metricSubscriptions.set(subscriptionKey, { + this.metricSubscriptions.set(key, { stopUpdates, - clients: new Set([client.id]) + clients: new Set([clientId]), }); - } else { - this.metricSubscriptions.get(subscriptionKey)?.clients.add(client.id); - // Отправляем кэшированные данные новому клиенту - const cachedData = this.lastSentData.get(subscriptionKey); - if (cachedData) { - client.emit('metrics-data', { - metric: subscriptionKey, - data: cachedData - }); + + const unsubscribe = () => { + this.logger.log(`Unsubscribing client ${clientId} from ${key}`); + const subscription = this.metricSubscriptions.get(key); + if (subscription) { + subscription.clients.delete(clientId); + if (subscription.clients.size === 0) { + subscription.stopUpdates(); + this.metricSubscriptions.delete(key); + this.lastSentData.delete(key); + } + } + }; + + client.on('close', unsubscribe); + client.on('error', unsubscribe); + + } catch (error) { + this.logger.error(`Subscription error for ${key}:`, error); + this.sendError(client, error.message); + + if (!this.metricSubscriptions.has(key)) { + this.lastSentData.delete(key); } } - - const unsubscribe = () => { - const subscription = this.metricSubscriptions.get(subscriptionKey); - if (subscription) { - subscription.clients.delete(client.id); - if (subscription.clients.size === 0) { - subscription.stopUpdates(); - this.metricSubscriptions.delete(subscriptionKey); - this.lastSentData.delete(subscriptionKey); - } - } - }; - - client.on('disconnect', unsubscribe); - client.on('unsubscribe-metric', unsubscribe); } - async sendPeriodicUpdates( + private handleUnsubscribeMetric( + clientId: string, + payload: { metric: string; filters?: Filters } + ) { + const { metric, filters = {} } = payload || {}; + if (!metric) return; + + const key = this.getSubscriptionKey(metric, filters); + const sub = this.metricSubscriptions.get(key); + if (!sub) return; + + sub.clients.delete(clientId); + if (sub.clients.size === 0) { + sub.stopUpdates(); + this.metricSubscriptions.delete(key); + this.lastSentData.delete(key); + } + } + + + private getQueryParams(rawUrl?: string): Record { + try { + const url = new URL(rawUrl || '', 'http://localhost'); // безопасная база + return Object.fromEntries(url.searchParams.entries()); + } catch { + return {}; + } + } + + private getSubscriptionKey(metric: string, filters: Filters): string { + const keys = Object.keys(filters).sort(); + const filterString = keys + .map((k) => `${k}=${encodeURIComponent(filters[k])}`) + .join('&'); + return `${metric}${filterString ? `?${filterString}` : ''}`; + } + + private isDataEqual(a: any[], b: any[]) { + if (!a || !b || a.length !== b.length) return false; + return a.every((item, i) => { + const x = b[i]; + return ( + item?.value === x?.value && + item?.status === x?.status && + item?.timestamp === x?.timestamp + ); + }); + } + + private async sendPeriodicUpdates( metric: string, interval: number, - callback: (data: any) => void, - filters: Record = {} + cb: (data: any) => void, + filters: Filters ) { - // Добавляем небольшую случайную задержку, чтобы избежать пиковой нагрузки const initialDelay = Math.floor(Math.random() * 5000); - - await new Promise(resolve => setTimeout(resolve, initialDelay)); + await new Promise((r) => setTimeout(r, initialDelay)); const timer = setInterval(async () => { try { - const data = await this.prometheusService.fetchMetricsWithFilters(metric, filters); - callback(data); - } catch (error) { - this.logger.error(`Error in periodic update for ${metric}:`, error.message); + const data = await this.prometheusService.fetchMetricsWithFilters( + metric, + filters + ); + cb(data); + } catch (e: any) { + this.logger.error( + `Error in periodic update for ${metric}:`, + e?.message + ); } }, interval); - return () => { - clearInterval(timer); - this.lastSentData.delete(this.getSubscriptionKey(metric, filters)); - this.logger.log(`Stopped updates for ${metric}`); - }; + return () => clearInterval(timer); } -} \ No newline at end of file + + private sendMessage(client: WebSocket, message: any) { + if (client.readyState === WebSocket.OPEN) { + client.send(JSON.stringify(message)); + } + } + + private broadcast(message: any) { + const raw = JSON.stringify(message); + this.wss.clients.forEach((c) => { + if (c.readyState === WebSocket.OPEN) c.send(raw); + }); + } + + private sendError(client: WebSocket, error: string, requestId?: string) { + this.sendMessage(client, { + event: 'metrics-error', + data: { error, requestId }, + }); + } +} + +