Compare commits
4 Commits
a9650366eb
...
0f4f4bcf15
| Author | SHA1 | Date |
|---|---|---|
|
|
0f4f4bcf15 | |
|
|
3daef48d4c | |
|
|
ad2f740384 | |
|
|
646597d111 |
2
.env
2
.env
|
|
@ -1,7 +1,7 @@
|
|||
# Прометеус
|
||||
PROMETHEUS_API=http://192.168.2.34:9090/api/v1
|
||||
|
||||
FRONTEND_URL=localhost:5173
|
||||
FRONTEND_URL=http://localhost:5173
|
||||
|
||||
# Постгресс
|
||||
DB_HOST=192.168.2.37
|
||||
|
|
|
|||
|
|
@ -36,7 +36,8 @@
|
|||
"reflect-metadata": "^0.2.2",
|
||||
"rxjs": "^7.8.1",
|
||||
"socket.io": "^4.8.1",
|
||||
"typeorm": "^0.3.21"
|
||||
"typeorm": "^0.3.21",
|
||||
"ws": "^8.18.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@eslint/eslintrc": "^3.2.0",
|
||||
|
|
@ -5956,6 +5957,27 @@
|
|||
"node": ">= 0.6"
|
||||
}
|
||||
},
|
||||
"node_modules/engine.io/node_modules/ws": {
|
||||
"version": "8.17.1",
|
||||
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz",
|
||||
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=10.0.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"bufferutil": "^4.0.1",
|
||||
"utf-8-validate": ">=5.0.2"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"bufferutil": {
|
||||
"optional": true
|
||||
},
|
||||
"utf-8-validate": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/enhanced-resolve": {
|
||||
"version": "5.18.1",
|
||||
"resolved": "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.18.1.tgz",
|
||||
|
|
@ -10984,6 +11006,27 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"node_modules/socket.io-adapter/node_modules/ws": {
|
||||
"version": "8.17.1",
|
||||
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz",
|
||||
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=10.0.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"bufferutil": "^4.0.1",
|
||||
"utf-8-validate": ">=5.0.2"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"bufferutil": {
|
||||
"optional": true
|
||||
},
|
||||
"utf-8-validate": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/socket.io-parser": {
|
||||
"version": "4.2.4",
|
||||
"resolved": "https://registry.npmmirror.com/socket.io-parser/-/socket.io-parser-4.2.4.tgz",
|
||||
|
|
@ -12918,9 +12961,9 @@
|
|||
"license": "ISC"
|
||||
},
|
||||
"node_modules/ws": {
|
||||
"version": "8.17.1",
|
||||
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz",
|
||||
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
|
||||
"version": "8.18.3",
|
||||
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.18.3.tgz",
|
||||
"integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=10.0.0"
|
||||
|
|
|
|||
43
package.json
43
package.json
|
|
@ -20,34 +20,35 @@
|
|||
"test:e2e": "jest --config ./test/jest-e2e.json"
|
||||
},
|
||||
"dependencies": {
|
||||
"@clickhouse/client": "^1.11.2",
|
||||
"@clickhouse/client-web": "^1.11.2",
|
||||
"@nestjs/axios": "^4.0.0",
|
||||
"@nestjs/common": "^11.0.1",
|
||||
"@nestjs/core": "^11.0.1",
|
||||
"@nestjs/config": "^4.0.0",
|
||||
"@nestjs/platform-express": "^11.0.1",
|
||||
"axios": "^1.7.9",
|
||||
"reflect-metadata": "^0.2.2",
|
||||
"dotenv": "^16.3.1",
|
||||
"rxjs": "^7.8.1",
|
||||
"@nestjs/typeorm": "^11.0.0",
|
||||
"pg": "^8.14.1",
|
||||
"typeorm": "^0.3.21",
|
||||
"bcrypt": "^5.1.1",
|
||||
"@types/bcrypt": "^5.0.2",
|
||||
"socket.io": "^4.8.1",
|
||||
"@nestjs/websockets": "11.0.12",
|
||||
"@nestjs/platform-socket.io": "11.0.12",
|
||||
"passport": "^0.7.0",
|
||||
"passport-jwt": "^4.0.1",
|
||||
"cookie-parser": "^1.4.7",
|
||||
"@types/passport-jwt": "^4.0.1",
|
||||
"@types/cookie-parser": "^1.4.8",
|
||||
"@nestjs/core": "^11.0.1",
|
||||
"@nestjs/jwt": "^11.0.0",
|
||||
"@nestjs/passport": "^11.0.5",
|
||||
"@nestjs/platform-express": "^11.0.1",
|
||||
"@nestjs/platform-socket.io": "11.0.12",
|
||||
"@nestjs/swagger": "11.1.4",
|
||||
"@clickhouse/client": "^1.11.2",
|
||||
"@nestjs/typeorm": "^11.0.0",
|
||||
"@nestjs/websockets": "11.0.12",
|
||||
"@types/bcrypt": "^5.0.2",
|
||||
"@types/cookie-parser": "^1.4.8",
|
||||
"@types/passport-jwt": "^4.0.1",
|
||||
"axios": "^1.7.9",
|
||||
"bcrypt": "^5.1.1",
|
||||
"cookie-parser": "^1.4.7",
|
||||
"date-fns": "4.1.0",
|
||||
"@clickhouse/client-web": "^1.11.2"
|
||||
"dotenv": "^16.3.1",
|
||||
"passport": "^0.7.0",
|
||||
"passport-jwt": "^4.0.1",
|
||||
"pg": "^8.14.1",
|
||||
"reflect-metadata": "^0.2.2",
|
||||
"rxjs": "^7.8.1",
|
||||
"socket.io": "^4.8.1",
|
||||
"typeorm": "^0.3.21",
|
||||
"ws": "^8.18.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@eslint/eslintrc": "^3.2.0",
|
||||
|
|
|
|||
|
|
@ -1,253 +1,391 @@
|
|||
import {
|
||||
WebSocketGateway,
|
||||
WebSocketServer,
|
||||
OnGatewayInit,
|
||||
OnGatewayConnection,
|
||||
OnGatewayDisconnect,
|
||||
SubscribeMessage,
|
||||
} from '@nestjs/websockets';
|
||||
import { Server, Socket } from 'socket.io';
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { Server, WebSocket } from 'ws';
|
||||
import { createServer } from 'http';
|
||||
import { PrometheusService } from './prometheus.service';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
|
||||
@WebSocketGateway({
|
||||
cors: {
|
||||
origin: process.env.FRONTEND_URL,
|
||||
methods: ['GET', 'POST'],
|
||||
credentials: true
|
||||
},
|
||||
namespace: '/api/metrics-ws'
|
||||
})
|
||||
export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
|
||||
@WebSocketServer() server: Server;
|
||||
type Filters = Record<string, string>;
|
||||
|
||||
@Injectable()
|
||||
export class MetricsGateway implements OnModuleInit {
|
||||
private readonly logger = new Logger(MetricsGateway.name);
|
||||
private activeSockets: Map<string, Socket> = new Map();
|
||||
private metricSubscriptions = new Map<string, {
|
||||
stopUpdates: () => void;
|
||||
clients: Set<string>;
|
||||
}>();
|
||||
private lastSentData = new Map<string, any>(); // Кэш последних отправленных данных
|
||||
private wss: Server;
|
||||
|
||||
constructor(private readonly prometheusService: PrometheusService) { }
|
||||
private activeSockets = new Map<string, WebSocket>();
|
||||
private metricSubscriptions = new Map<
|
||||
string,
|
||||
{ stopUpdates: () => void; clients: Set<string> }
|
||||
>();
|
||||
|
||||
afterInit(server: Server) {
|
||||
this.logger.log('WebSocket Gateway initialized');
|
||||
private lastSentData = new Map<string, any>();
|
||||
|
||||
constructor(
|
||||
private readonly prometheusService: PrometheusService,
|
||||
private readonly configService: ConfigService
|
||||
) { }
|
||||
|
||||
onModuleInit() {
|
||||
const httpServer = createServer();
|
||||
this.wss = new Server({
|
||||
server: httpServer,
|
||||
path: '/metrics-ws',
|
||||
});
|
||||
|
||||
this.wss.on('connection', (client, request) =>
|
||||
this.handleConnection(client, request)
|
||||
);
|
||||
this.wss.on('error', (err) =>
|
||||
this.logger.error('WebSocket server error:', err)
|
||||
);
|
||||
|
||||
const wsPort = Number(this.configService.get('WS_PORT') || 3001);
|
||||
httpServer.listen(wsPort, () => {
|
||||
this.logger.log(
|
||||
`WebSocket server running at ws://localhost:${wsPort}/api/metrics-ws`
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
handleConnection(client: Socket) {
|
||||
this.logger.log(`Client connected: ${client.id}`);
|
||||
this.activeSockets.set(client.id, client);
|
||||
|
||||
private handleConnection(client: WebSocket, request: any) {
|
||||
let clientId =
|
||||
this.getQueryParams(request?.url).clientId ||
|
||||
Math.random().toString(36).slice(2);
|
||||
|
||||
this.activeSockets.set(clientId, client);
|
||||
this.logger.log(`Client connected: ${clientId}`);
|
||||
|
||||
client.on('message', (raw) => {
|
||||
try {
|
||||
const msg = JSON.parse(raw.toString());
|
||||
this.handleMessage(clientId, client, msg);
|
||||
} catch (err) {
|
||||
this.sendError(client, 'Invalid JSON');
|
||||
}
|
||||
});
|
||||
|
||||
client.on('close', () => this.cleanupClient(clientId));
|
||||
client.on('error', (err) => {
|
||||
this.logger.error(`Client ${clientId} error:`, err);
|
||||
this.cleanupClient(clientId);
|
||||
});
|
||||
}
|
||||
|
||||
handleDisconnect(client: Socket) {
|
||||
this.logger.log(`Client disconnected: ${client.id}`);
|
||||
this.activeSockets.delete(client.id);
|
||||
private handleMessage(clientId: string, client: WebSocket, message: any) {
|
||||
const { event, data } = message || {};
|
||||
if (!event) return this.sendError(client, 'Event type is required');
|
||||
|
||||
// Очистка всех подписок этого клиента
|
||||
for (const [metric, subscription] of this.metricSubscriptions) {
|
||||
subscription.clients.delete(client.id);
|
||||
if (subscription.clients.size === 0) {
|
||||
subscription.stopUpdates();
|
||||
this.metricSubscriptions.delete(metric);
|
||||
this.lastSentData.delete(metric);
|
||||
switch (event) {
|
||||
case 'unsubscribe-all':
|
||||
return this.unsubscribeAllForClient(clientId);
|
||||
|
||||
case 'get-metrics':
|
||||
return this.handleGetMetrics(client, data);
|
||||
|
||||
case 'subscribe-metric':
|
||||
return this.handleSubscribeMetric(clientId, client, data);
|
||||
|
||||
case 'unsubscribe-metric':
|
||||
return this.handleUnsubscribeMetric(clientId, data);
|
||||
|
||||
default:
|
||||
return this.sendError(client, `Unknown event type: ${event}`);
|
||||
}
|
||||
}
|
||||
|
||||
private cleanupClient(clientId: string) {
|
||||
if (!this.activeSockets.has(clientId)) return;
|
||||
this.logger.log(`Client disconnected: ${clientId}`);
|
||||
this.activeSockets.delete(clientId);
|
||||
|
||||
setTimeout(() => {
|
||||
if (!this.activeSockets.has(clientId)) {
|
||||
}
|
||||
}, 5000);
|
||||
|
||||
for (const [key, sub] of this.metricSubscriptions) {
|
||||
sub.clients.delete(clientId);
|
||||
if (sub.clients.size === 0) {
|
||||
sub.stopUpdates();
|
||||
this.metricSubscriptions.delete(key);
|
||||
this.lastSentData.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SubscribeMessage('unsubscribe-all')
|
||||
handleUnsubscribeAll(client: Socket) {
|
||||
for (const [metric, subscription] of this.metricSubscriptions) {
|
||||
subscription.clients.delete(client.id);
|
||||
if (subscription.clients.size === 0) {
|
||||
subscription.stopUpdates();
|
||||
this.metricSubscriptions.delete(metric);
|
||||
this.lastSentData.delete(metric);
|
||||
private unsubscribeAllForClient(clientId: string) {
|
||||
for (const [key, sub] of this.metricSubscriptions) {
|
||||
if (sub.clients.has(clientId)) sub.clients.delete(clientId);
|
||||
if (sub.clients.size === 0) {
|
||||
sub.stopUpdates();
|
||||
this.metricSubscriptions.delete(key);
|
||||
this.lastSentData.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SubscribeMessage('get-metrics')
|
||||
async handleGetMetrics(client: Socket, payload: any) {
|
||||
const { metric, start, end, step, isRangeQuery, requestId, filters = {} } = payload;
|
||||
|
||||
private async handleGetMetrics(client: WebSocket, payload: any) {
|
||||
const {
|
||||
metric,
|
||||
start,
|
||||
end,
|
||||
step,
|
||||
isRangeQuery,
|
||||
requestId,
|
||||
filters = {},
|
||||
} = payload || {};
|
||||
|
||||
if (!metric) {
|
||||
client.emit('metrics-error', {
|
||||
error: 'Metric name is required',
|
||||
requestId
|
||||
});
|
||||
return;
|
||||
return this.sendError(client, 'Metric name is required', requestId);
|
||||
}
|
||||
|
||||
if (isRangeQuery) {
|
||||
try {
|
||||
const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step, filters);
|
||||
client.emit('metrics-data', { metric, data, requestId });
|
||||
return;
|
||||
} catch (error) {
|
||||
client.emit('metrics-error', {
|
||||
error: error.message,
|
||||
requestId
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const subscriptionKey = this.getSubscriptionKey(metric, filters);
|
||||
// Отправляем текущие данные сразу при запросе
|
||||
const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||
client.emit('metrics-data', { metric, data: initialData, requestId });
|
||||
this.lastSentData.set(subscriptionKey, initialData);
|
||||
|
||||
const stopUpdates = await this.sendPeriodicUpdates(
|
||||
const rangeData = await this.prometheusService.fetchMetricsRange(
|
||||
metric,
|
||||
step || 5000,
|
||||
(data) => {
|
||||
const lastData = this.lastSentData.get(subscriptionKey);
|
||||
if (!this.isDataEqual(lastData, data)) {
|
||||
client.emit('metrics-data', { metric, data, requestId });
|
||||
this.lastSentData.set(subscriptionKey, data);
|
||||
}
|
||||
},
|
||||
start,
|
||||
end,
|
||||
step,
|
||||
filters
|
||||
);
|
||||
this.logger.debug('RangeQuery result', JSON.stringify(rangeData).slice(0, 200));
|
||||
|
||||
const cleanup = () => {
|
||||
stopUpdates();
|
||||
this.lastSentData.delete(subscriptionKey);
|
||||
client.off('disconnect', cleanup);
|
||||
client.off('unsubscribe-metric', cleanup);
|
||||
};
|
||||
|
||||
client.on('disconnect', cleanup);
|
||||
client.on('unsubscribe-metric', cleanup);
|
||||
|
||||
} catch (error) {
|
||||
client.emit('metrics-error', {
|
||||
error: error.message,
|
||||
requestId
|
||||
return this.sendMessage(client, {
|
||||
event: 'metrics-data',
|
||||
data: rangeData,
|
||||
metric,
|
||||
requestId,
|
||||
});
|
||||
} catch (err: any) {
|
||||
return this.sendError(client, err?.message || 'Range query error', requestId);
|
||||
}
|
||||
}
|
||||
|
||||
private getSubscriptionKey(metric: string, filters: Record<string, string>): string {
|
||||
const filterKeys = Object.keys(filters).sort();
|
||||
const filterString = filterKeys.map(k => `${k}=${encodeURIComponent(filters[k])}`).join('&');
|
||||
return `${metric}${filterString ? `?${filterString}` : ''}`;
|
||||
}
|
||||
|
||||
// Сравниваем данные, чтобы избежать лишних отправок
|
||||
private isDataEqual(oldData: any[], newData: any[]): boolean {
|
||||
if (!oldData || !newData || oldData.length !== newData.length) return false;
|
||||
|
||||
return oldData.every((oldItem, index) => {
|
||||
const newItem = newData[index];
|
||||
return (
|
||||
oldItem.value === newItem.value &&
|
||||
oldItem.status === newItem.status &&
|
||||
oldItem.timestamp === newItem.timestamp
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@SubscribeMessage('subscribe-metric')
|
||||
async handleSubscribeMetric(
|
||||
client: Socket,
|
||||
payload: {
|
||||
metric: string;
|
||||
interval?: number;
|
||||
filters?: Record<string, string>;
|
||||
}
|
||||
) {
|
||||
const { metric, interval = 60000, filters = {} } = payload; // По умолчанию 60 секунд
|
||||
try {
|
||||
const subscriptionKey = this.getSubscriptionKey(metric, filters);
|
||||
|
||||
if (!this.metricSubscriptions.has(subscriptionKey)) {
|
||||
// Отправляем текущие данные сразу при подписке
|
||||
try {
|
||||
const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||
client.emit('metrics-data', {
|
||||
metric: subscriptionKey,
|
||||
data: initialData
|
||||
const initialData =
|
||||
await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||
|
||||
this.sendMessage(client, {
|
||||
event: 'metrics-data',
|
||||
data: { metric, data: initialData, requestId },
|
||||
});
|
||||
this.lastSentData.set(subscriptionKey, initialData);
|
||||
} catch (error) {
|
||||
this.logger.error(`Error fetching initial data for ${metric}:`, error.message);
|
||||
|
||||
let lastLocal = initialData;
|
||||
|
||||
const jitter = Math.floor(Math.random() * 5000);
|
||||
setTimeout(() => {
|
||||
const timer = setInterval(async () => {
|
||||
try {
|
||||
const fresh =
|
||||
await this.prometheusService.fetchMetricsWithFilters(
|
||||
metric,
|
||||
filters
|
||||
);
|
||||
if (!this.isDataEqual(lastLocal, fresh)) {
|
||||
this.sendMessage(client, {
|
||||
event: 'metrics-data',
|
||||
data: { metric, data: fresh, requestId },
|
||||
});
|
||||
lastLocal = fresh;
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Error in on-demand periodic update for ${subscriptionKey}:`,
|
||||
(e as any)?.message
|
||||
);
|
||||
}
|
||||
}, step || 5000);
|
||||
|
||||
const closeOnce = () => clearInterval(timer);
|
||||
client.once('close', closeOnce);
|
||||
client.once('error', closeOnce);
|
||||
}, jitter);
|
||||
} catch (err: any) {
|
||||
return this.sendError(client, err?.message || 'get-metrics error', requestId);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleSubscribeMetric(clientId: string, client: WebSocket, payload: any) {
|
||||
const { metric, interval = 60000, filters = {} } = payload || {};
|
||||
if (!metric) {
|
||||
this.sendError(client, 'Metric name is required');
|
||||
return;
|
||||
}
|
||||
|
||||
const key = this.getSubscriptionKey(metric, filters);
|
||||
|
||||
try {
|
||||
const initial = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||
|
||||
if (!Array.isArray(initial)) {
|
||||
throw new Error(`Expected array for metric ${metric}, got ${typeof initial}`);
|
||||
}
|
||||
|
||||
this.sendMessage(client, {
|
||||
event: 'metrics-data',
|
||||
data: {
|
||||
metric: key,
|
||||
data: initial,
|
||||
type: 'initial'
|
||||
}
|
||||
});
|
||||
|
||||
this.lastSentData.set(key, initial);
|
||||
|
||||
const stopUpdates = await this.sendPeriodicUpdates(
|
||||
metric,
|
||||
interval,
|
||||
(data) => {
|
||||
// Отправляем только если данные изменились
|
||||
const lastData = this.lastSentData.get(subscriptionKey);
|
||||
if (!this.isDataEqual(lastData, data)) {
|
||||
this.server.emit('metrics-data', {
|
||||
metric: subscriptionKey,
|
||||
data
|
||||
(freshData) => {
|
||||
if (!Array.isArray(freshData)) {
|
||||
this.logger.error(`Periodic update: expected array for ${key}, got ${typeof freshData}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const lastData = this.lastSentData.get(key);
|
||||
if (!this.isDataEqual(lastData, freshData)) {
|
||||
this.broadcast({
|
||||
event: 'metrics-data',
|
||||
data: {
|
||||
metric: key,
|
||||
data: freshData,
|
||||
type: 'update'
|
||||
}
|
||||
});
|
||||
this.lastSentData.set(subscriptionKey, data);
|
||||
this.lastSentData.set(key, freshData);
|
||||
}
|
||||
},
|
||||
filters
|
||||
);
|
||||
|
||||
this.metricSubscriptions.set(subscriptionKey, {
|
||||
this.metricSubscriptions.set(key, {
|
||||
stopUpdates,
|
||||
clients: new Set([client.id])
|
||||
clients: new Set([clientId]),
|
||||
});
|
||||
} else {
|
||||
this.metricSubscriptions.get(subscriptionKey)?.clients.add(client.id);
|
||||
// Отправляем кэшированные данные новому клиенту
|
||||
const cachedData = this.lastSentData.get(subscriptionKey);
|
||||
if (cachedData) {
|
||||
client.emit('metrics-data', {
|
||||
metric: subscriptionKey,
|
||||
data: cachedData
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const unsubscribe = () => {
|
||||
const subscription = this.metricSubscriptions.get(subscriptionKey);
|
||||
this.logger.log(`Unsubscribing client ${clientId} from ${key}`);
|
||||
const subscription = this.metricSubscriptions.get(key);
|
||||
if (subscription) {
|
||||
subscription.clients.delete(client.id);
|
||||
subscription.clients.delete(clientId);
|
||||
if (subscription.clients.size === 0) {
|
||||
subscription.stopUpdates();
|
||||
this.metricSubscriptions.delete(subscriptionKey);
|
||||
this.lastSentData.delete(subscriptionKey);
|
||||
this.metricSubscriptions.delete(key);
|
||||
this.lastSentData.delete(key);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
client.on('disconnect', unsubscribe);
|
||||
client.on('unsubscribe-metric', unsubscribe);
|
||||
client.on('close', unsubscribe);
|
||||
client.on('error', unsubscribe);
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error(`Subscription error for ${key}:`, error);
|
||||
this.sendError(client, error.message);
|
||||
|
||||
if (!this.metricSubscriptions.has(key)) {
|
||||
this.lastSentData.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async sendPeriodicUpdates(
|
||||
private handleUnsubscribeMetric(
|
||||
clientId: string,
|
||||
payload: { metric: string; filters?: Filters }
|
||||
) {
|
||||
const { metric, filters = {} } = payload || {};
|
||||
if (!metric) return;
|
||||
|
||||
const key = this.getSubscriptionKey(metric, filters);
|
||||
const sub = this.metricSubscriptions.get(key);
|
||||
if (!sub) return;
|
||||
|
||||
sub.clients.delete(clientId);
|
||||
if (sub.clients.size === 0) {
|
||||
sub.stopUpdates();
|
||||
this.metricSubscriptions.delete(key);
|
||||
this.lastSentData.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private getQueryParams(rawUrl?: string): Record<string, string> {
|
||||
try {
|
||||
const url = new URL(rawUrl || '', 'http://localhost'); // безопасная база
|
||||
return Object.fromEntries(url.searchParams.entries());
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
private getSubscriptionKey(metric: string, filters: Filters): string {
|
||||
const keys = Object.keys(filters).sort();
|
||||
const filterString = keys
|
||||
.map((k) => `${k}=${encodeURIComponent(filters[k])}`)
|
||||
.join('&');
|
||||
return `${metric}${filterString ? `?${filterString}` : ''}`;
|
||||
}
|
||||
|
||||
private isDataEqual(a: any[], b: any[]) {
|
||||
if (!a || !b || a.length !== b.length) return false;
|
||||
return a.every((item, i) => {
|
||||
const x = b[i];
|
||||
return (
|
||||
item?.value === x?.value &&
|
||||
item?.status === x?.status &&
|
||||
item?.timestamp === x?.timestamp
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private async sendPeriodicUpdates(
|
||||
metric: string,
|
||||
interval: number,
|
||||
callback: (data: any) => void,
|
||||
filters: Record<string, string> = {}
|
||||
cb: (data: any) => void,
|
||||
filters: Filters
|
||||
) {
|
||||
// Добавляем небольшую случайную задержку, чтобы избежать пиковой нагрузки
|
||||
const initialDelay = Math.floor(Math.random() * 5000);
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, initialDelay));
|
||||
await new Promise((r) => setTimeout(r, initialDelay));
|
||||
|
||||
const timer = setInterval(async () => {
|
||||
try {
|
||||
const data = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||
callback(data);
|
||||
} catch (error) {
|
||||
this.logger.error(`Error in periodic update for ${metric}:`, error.message);
|
||||
const data = await this.prometheusService.fetchMetricsWithFilters(
|
||||
metric,
|
||||
filters
|
||||
);
|
||||
cb(data);
|
||||
} catch (e: any) {
|
||||
this.logger.error(
|
||||
`Error in periodic update for ${metric}:`,
|
||||
e?.message
|
||||
);
|
||||
}
|
||||
}, interval);
|
||||
|
||||
return () => {
|
||||
clearInterval(timer);
|
||||
this.lastSentData.delete(this.getSubscriptionKey(metric, filters));
|
||||
this.logger.log(`Stopped updates for ${metric}`);
|
||||
};
|
||||
return () => clearInterval(timer);
|
||||
}
|
||||
|
||||
private sendMessage(client: WebSocket, message: any) {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(JSON.stringify(message));
|
||||
}
|
||||
}
|
||||
|
||||
private broadcast(message: any) {
|
||||
const raw = JSON.stringify(message);
|
||||
this.wss.clients.forEach((c) => {
|
||||
if (c.readyState === WebSocket.OPEN) c.send(raw);
|
||||
});
|
||||
}
|
||||
|
||||
private sendError(client: WebSocket, error: string, requestId?: string) {
|
||||
this.sendMessage(client, {
|
||||
event: 'metrics-error',
|
||||
data: { error, requestId },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue