import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { Server, WebSocket } from 'ws'; import { createServer } from 'http'; import { PrometheusService } from './prometheus.service'; import { ConfigService } from '@nestjs/config'; import { PrometheusMetric } from './prometheus-metric.interface'; type Filters = Record; interface RealtimeSubscription { clients: Set; interval: NodeJS.Timeout; metric: string; filters: Filters; lastData: PrometheusMetric[]; } interface HistoricalRequest { client: WebSocket; requestId: string; } @Injectable() export class MetricsGateway implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(MetricsGateway.name); private wss: Server; private httpServer: ReturnType; // Real-time подписки (одна на метрику, много клиентов) private realtimeSubscriptions = new Map(); // Активные клиенты private activeClients = new Map(); // Исторические запросы (для отслеживания) private historicalRequests = new Map(); constructor( private readonly prometheusService: PrometheusService, private readonly configService: ConfigService ) { } onModuleInit() { this.httpServer = createServer(); this.wss = new Server({ server: this.httpServer, path: '/metrics-ws', }); this.wss.on('connection', (client, request) => this.handleConnection(client, request) ); this.wss.on('error', (err) => this.logger.error('WebSocket server error:', err) ); const wsPort = Number(this.configService.get('WS_PORT') || 3001); this.httpServer.listen(wsPort, () => { this.logger.log( `WebSocket server running at ws://localhost:${wsPort}/metrics-ws` ); }); } onModuleDestroy() { // Очистка всех ресурсов this.clearAllSubscriptions(); this.wss?.close(); this.httpServer?.close(); } private handleConnection(client: WebSocket, request: any) { const clientId = this.getClientId(request?.url); this.activeClients.set(clientId, client); this.logger.log(`Client connected: ${clientId}`); this.logger.debug(`Active clients: ${this.activeClients.size}, Subscriptions: ${this.realtimeSubscriptions.size}`); client.on('message', (raw) => { try { const message = JSON.parse(raw.toString()); this.handleMessage(clientId, client, message); } catch (err) { this.sendError(client, 'Invalid JSON format'); } }); client.on('close', () => this.handleClientDisconnect(clientId)); client.on('error', (err) => { this.logger.error(`Client ${clientId} error:`, err); this.handleClientDisconnect(clientId); }); // Отправляем приветственное сообщение this.sendMessage(client, { event: 'connected', data: { clientId, timestamp: Date.now() } }); } private handleMessage(clientId: string, client: WebSocket, message: any) { const { event, data, requestId } = message; if (!event) { return this.sendError(client, 'Event type is required', requestId); } this.logger.debug(`Received event: ${event} from client: ${clientId}`); switch (event) { case 'subscribe-realtime': return this.handleSubscribeRealtime(clientId, client, data, requestId); case 'unsubscribe-realtime': return this.handleUnsubscribeRealtime(clientId, data, requestId); case 'unsubscribe-all': return this.handleUnsubscribeAll(clientId, requestId); case 'get-historical': return this.handleGetHistorical(client, data, requestId); case 'get-current': return this.handleGetCurrent(client, data, requestId); default: return this.sendError(client, `Unknown event type: ${event}`, requestId); } } private async handleSubscribeRealtime( clientId: string, client: WebSocket, payload: any, requestId?: string ) { const { metric, filters = {}, interval = 10000 } = payload || {}; if (!metric) { return this.sendError(client, 'Metric name is required', requestId); } const subscriptionKey = this.getSubscriptionKey(metric, filters); try { // Если подписка уже существует, просто добавляем клиента if (this.realtimeSubscriptions.has(subscriptionKey)) { const subscription = this.realtimeSubscriptions.get(subscriptionKey)!; subscription.clients.add(clientId); this.logger.log(`Client ${clientId} added to existing subscription: ${subscriptionKey}`); // Отправляем последние данные клиенту if (subscription.lastData) { this.sendMessage(client, { event: 'realtime-data', data: { metric, filters, data: subscription.lastData, type: 'initial' }, requestId }); } return; } // Создаем новую подписку this.logger.log(`Creating new realtime subscription: ${subscriptionKey}`); // Первоначальная загрузка данных const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); if (!Array.isArray(initialData)) { throw new Error(`Expected array for metric ${metric}, got ${typeof initialData}`); } // Создаем интервал для обновлений const intervalId = setInterval( () => this.updateRealtimeData(subscriptionKey), interval ); // Сохраняем подписку const subscription: RealtimeSubscription = { clients: new Set([clientId]), interval: intervalId, metric, filters, lastData: initialData }; this.realtimeSubscriptions.set(subscriptionKey, subscription); // Отправляем данные клиенту this.sendMessage(client, { event: 'realtime-data', data: { metric, filters, data: initialData, type: 'initial' }, requestId }); this.logger.debug(`Subscription created for ${subscriptionKey} with ${interval}ms interval`); } catch (error) { this.logger.error(`Subscribe error for ${subscriptionKey}:`, error); this.sendError(client, error.message, requestId); } } private async updateRealtimeData(subscriptionKey: string) { const subscription = this.realtimeSubscriptions.get(subscriptionKey); if (!subscription) return; try { const freshData = await this.prometheusService.fetchMetricsWithFilters( subscription.metric, subscription.filters ); if (!this.isDataEqual(subscription.lastData, freshData)) { subscription.lastData = freshData; // Рассылаем обновление всем подписанным клиентам this.broadcastToClients(Array.from(subscription.clients), { event: 'realtime-data', data: { metric: subscription.metric, filters: subscription.filters, data: freshData, type: 'update' } }); this.logger.debug(`Data updated for subscription: ${subscriptionKey}`); } } catch (error) { this.logger.error(`Update error for ${subscriptionKey}:`, error); } } private handleUnsubscribeRealtime( clientId: string, payload: any, requestId?: string ) { const { metric, filters = {} } = payload || {}; if (!metric) { return; } const subscriptionKey = this.getSubscriptionKey(metric, filters); const subscription = this.realtimeSubscriptions.get(subscriptionKey); if (!subscription) { this.logger.debug(`No subscription found for: ${subscriptionKey}`); return; } // Удаляем клиента из подписки subscription.clients.delete(clientId); this.logger.log(`Client ${clientId} unsubscribed from: ${subscriptionKey}`); // Если больше нет клиентов, очищаем подписку if (subscription.clients.size === 0) { clearInterval(subscription.interval); this.realtimeSubscriptions.delete(subscriptionKey); this.logger.log(`Subscription removed: ${subscriptionKey}`); } } private handleUnsubscribeAll(clientId: string, requestId?: string) { let unsubscribedCount = 0; for (const [key, subscription] of this.realtimeSubscriptions) { if (subscription.clients.has(clientId)) { subscription.clients.delete(clientId); unsubscribedCount++; if (subscription.clients.size === 0) { clearInterval(subscription.interval); this.realtimeSubscriptions.delete(key); } } } this.logger.log(`Client ${clientId} unsubscribed from ${unsubscribedCount} subscriptions`); } private async handleGetHistorical( client: WebSocket, payload: any, requestId?: string ) { const { metric, start, end, step = 60, filters = {} } = payload || {}; if (!metric) { return this.sendError(client, 'Metric name is required', requestId); } if (!start || !end) { return this.sendError(client, 'Start and end time are required', requestId); } const requestKey = `${requestId || Date.now()}-${metric}`; try { this.logger.debug(`Fetching historical data for: ${metric}, from ${new Date(start).toISOString()} to ${new Date(end).toISOString()}`); const historicalData = await this.prometheusService.fetchMetricsRange( metric, Math.floor(start / 1000), // Convert to seconds Math.floor(end / 1000), // Convert to seconds step, filters ); this.sendMessage(client, { event: 'historical-data', data: { metric, filters, data: historicalData, start, end, step }, requestId }); this.logger.debug(`Historical data sent for: ${metric}, points: ${historicalData.length}`); } catch (error) { this.logger.error(`Historical data error for ${metric}:`, error); this.sendError(client, error.message, requestId); } } private async handleGetCurrent( client: WebSocket, payload: any, requestId?: string ) { const { metric, filters = {} } = payload || {}; if (!metric) { return this.sendError(client, 'Metric name is required', requestId); } try { const currentData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); this.sendMessage(client, { event: 'current-data', data: { metric, filters, data: currentData, timestamp: Date.now() }, requestId }); } catch (error) { this.logger.error(`Current data error for ${metric}:`, error); this.sendError(client, error.message, requestId); } } private handleClientDisconnect(clientId: string) { this.logger.log(`Client disconnected: ${clientId}`); // Удаляем клиента из всех подписок this.handleUnsubscribeAll(clientId); // Удаляем из активных клиентов this.activeClients.delete(clientId); this.logger.debug(`Active clients: ${this.activeClients.size}, Subscriptions: ${this.realtimeSubscriptions.size}`); } private clearAllSubscriptions() { for (const [key, subscription] of this.realtimeSubscriptions) { clearInterval(subscription.interval); } this.realtimeSubscriptions.clear(); this.logger.log('All subscriptions cleared'); } private getClientId(url?: string): string { const params = this.getQueryParams(url); return params.clientId || `client-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`; } private getQueryParams(url?: string): Record { try { if (!url) return {}; const urlObj = new URL(url, 'http://localhost'); return Object.fromEntries(urlObj.searchParams.entries()); } catch { return {}; } } private getSubscriptionKey(metric: string, filters: Filters): string { const sortedFilters = Object.keys(filters) .sort() .map(key => `${key}=${encodeURIComponent(filters[key])}`) .join('&'); return sortedFilters ? `${metric}?${sortedFilters}` : metric; } private isDataEqual(a: PrometheusMetric[], b: PrometheusMetric[]): boolean { if (!Array.isArray(a) || !Array.isArray(b) || a.length !== b.length) { return false; } return a.every((itemA, index) => { const itemB = b[index]; return ( itemA.value === itemB.value && itemA.timestamp === itemB.timestamp && itemA.device === itemB.device && itemA.source_id === itemB.source_id ); }); } private sendMessage(client: WebSocket, message: any) { if (client.readyState === WebSocket.OPEN) { try { client.send(JSON.stringify(message)); } catch (error) { this.logger.error('Error sending message to client:', error); } } } private broadcastToClients(clientIds: string[], message: any) { const messageStr = JSON.stringify(message); clientIds.forEach(clientId => { const client = this.activeClients.get(clientId); if (client && client.readyState === WebSocket.OPEN) { try { client.send(messageStr); } catch (error) { this.logger.error(`Error broadcasting to client ${clientId}:`, error); } } }); } private sendError(client: WebSocket, error: string, requestId?: string) { this.sendMessage(client, { event: 'error', data: { error, requestId }, requestId }); } }