trust-module-backend/src/metrics.gateway.ts

194 lines
6.2 KiB
TypeScript

import {
WebSocketGateway,
WebSocketServer,
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { PrometheusService } from './prometheus.service';
import { Logger } from '@nestjs/common';
@WebSocketGateway({
cors: {
origin: process.env.FRONTEND_URL,
methods: ['GET', 'POST'],
credentials: true
},
namespace: '/api/metrics-ws'
})
export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server: Server;
private readonly logger = new Logger(MetricsGateway.name);
private activeSockets: Map<string, Socket> = new Map();
private metricSubscriptions = new Map<string, {
stopUpdates: () => void;
clients: Set<string>;
}>();
constructor(private readonly prometheusService: PrometheusService) { }
afterInit(server: Server) {
this.logger.log('WebSocket Gateway initialized');
}
handleConnection(client: Socket) {
this.logger.log(`Client connected: ${client.id}`);
this.activeSockets.set(client.id, client);
}
handleDisconnect(client: Socket) {
this.logger.log(`Client disconnected: ${client.id}`);
this.activeSockets.delete(client.id);
// Очистка всех подписок этого клиента
for (const [metric, subscription] of this.metricSubscriptions) {
subscription.clients.delete(client.id);
if (subscription.clients.size === 0) {
subscription.stopUpdates();
this.metricSubscriptions.delete(metric);
}
}
}
@SubscribeMessage('unsubscribe-all')
handleUnsubscribeAll(client: Socket) {
for (const [metric, subscription] of this.metricSubscriptions) {
subscription.clients.delete(client.id);
if (subscription.clients.size === 0) {
subscription.stopUpdates();
this.metricSubscriptions.delete(metric);
}
}
}
@SubscribeMessage('get-metrics')
async handleGetMetrics(client: Socket, payload: any) {
const { metric, start, end, step, isRangeQuery, requestId, filters = {} } = payload;
if (!metric) {
client.emit('metrics-error', {
error: 'Metric name is required',
requestId
});
return;
}
if (isRangeQuery) {
try {
const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step, filters);
client.emit('metrics-data', { metric, data, requestId });
return;
} catch (error) {
client.emit('metrics-error', {
error: error.message,
requestId
});
return;
}
}
try {
const stopUpdates = await this.sendPeriodicUpdates(
metric,
step || 5000,
(data) => {
client.emit('metrics-data', { metric, data, requestId });
},
filters
);
const cleanup = () => {
stopUpdates();
client.off('disconnect', cleanup);
client.off('unsubscribe-metric', cleanup);
};
client.on('disconnect', cleanup);
client.on('unsubscribe-metric', cleanup);
} catch (error) {
client.emit('metrics-error', {
error: error.message,
requestId
});
}
}
private getSubscriptionKey(metric: string, filters: Record<string, string>): string {
const filterKeys = Object.keys(filters).sort();
const filterString = filterKeys.map(k => `${k}=${encodeURIComponent(filters[k])}`).join('&');
return `${metric}${filterString ? `?${filterString}` : ''}`;
}
@SubscribeMessage('subscribe-metric')
async handleSubscribeMetric(
client: Socket,
payload: {
metric: string;
interval?: number;
filters?: Record<string, string>;
}
) {
const { metric, interval = 5000, filters = {} } = payload;
const subscriptionKey = this.getSubscriptionKey(metric, filters);
if (!this.metricSubscriptions.has(subscriptionKey)) {
const stopUpdates = await this.sendPeriodicUpdates(
metric,
interval,
(data) => {
this.server.emit('metrics-data', {
metric: subscriptionKey,
data
});
},
filters
);
this.metricSubscriptions.set(subscriptionKey, {
stopUpdates,
clients: new Set([client.id])
});
} else {
this.metricSubscriptions.get(subscriptionKey)?.clients.add(client.id);
}
const unsubscribe = () => {
const subscription = this.metricSubscriptions.get(subscriptionKey);
if (subscription) {
subscription.clients.delete(client.id);
if (subscription.clients.size === 0) {
subscription.stopUpdates();
this.metricSubscriptions.delete(subscriptionKey);
}
}
};
client.on('disconnect', unsubscribe);
client.on('unsubscribe-metric', unsubscribe);
}
async sendPeriodicUpdates(
metric: string,
interval: number,
callback: (data: any) => void,
filters: Record<string, string> = {}
) {
const timer = setInterval(async () => {
try {
const data = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
callback(data);
} catch (error) {
this.logger.error(`Error in periodic update for ${metric}:`, error.message);
}
}, interval);
return () => {
clearInterval(timer);
this.logger.log(`Stopped updates for ${metric}`);
};
}
}