Compare commits

..

4 Commits

Author SHA1 Message Date
Vladislav Drozdov 0f4f4bcf15 Merge pull request 'swagger' (#37) from swagger into rc
Reviewed-on: http://git.enode/deployer3000/trust-module-backend/pulls/37
Reviewed-by: Vladislav Drozdov <ya2@ya.ru>
2025-08-14 16:02:14 +03:00
deployer3000 3daef48d4c Merge branch 'rc' into swagger 2025-08-14 16:00:41 +03:00
SovietSpiderCat ad2f740384 change endpoint for proxy 2025-08-14 14:02:42 +03:00
SovietSpiderCat 646597d111 rework ws 2025-08-14 13:51:13 +03:00
4 changed files with 396 additions and 214 deletions

2
.env
View File

@ -1,7 +1,7 @@
# Прометеус # Прометеус
PROMETHEUS_API=http://192.168.2.34:9090/api/v1 PROMETHEUS_API=http://192.168.2.34:9090/api/v1
FRONTEND_URL=localhost:5173 FRONTEND_URL=http://localhost:5173
# Постгресс # Постгресс
DB_HOST=192.168.2.37 DB_HOST=192.168.2.37

51
package-lock.json generated
View File

@ -36,7 +36,8 @@
"reflect-metadata": "^0.2.2", "reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1", "rxjs": "^7.8.1",
"socket.io": "^4.8.1", "socket.io": "^4.8.1",
"typeorm": "^0.3.21" "typeorm": "^0.3.21",
"ws": "^8.18.3"
}, },
"devDependencies": { "devDependencies": {
"@eslint/eslintrc": "^3.2.0", "@eslint/eslintrc": "^3.2.0",
@ -5956,6 +5957,27 @@
"node": ">= 0.6" "node": ">= 0.6"
} }
}, },
"node_modules/engine.io/node_modules/ws": {
"version": "8.17.1",
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz",
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
"license": "MIT",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/enhanced-resolve": { "node_modules/enhanced-resolve": {
"version": "5.18.1", "version": "5.18.1",
"resolved": "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.18.1.tgz", "resolved": "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.18.1.tgz",
@ -10984,6 +11006,27 @@
} }
} }
}, },
"node_modules/socket.io-adapter/node_modules/ws": {
"version": "8.17.1",
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz",
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
"license": "MIT",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/socket.io-parser": { "node_modules/socket.io-parser": {
"version": "4.2.4", "version": "4.2.4",
"resolved": "https://registry.npmmirror.com/socket.io-parser/-/socket.io-parser-4.2.4.tgz", "resolved": "https://registry.npmmirror.com/socket.io-parser/-/socket.io-parser-4.2.4.tgz",
@ -12918,9 +12961,9 @@
"license": "ISC" "license": "ISC"
}, },
"node_modules/ws": { "node_modules/ws": {
"version": "8.17.1", "version": "8.18.3",
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.17.1.tgz", "resolved": "https://registry.npmmirror.com/ws/-/ws-8.18.3.tgz",
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==",
"license": "MIT", "license": "MIT",
"engines": { "engines": {
"node": ">=10.0.0" "node": ">=10.0.0"

View File

@ -20,34 +20,35 @@
"test:e2e": "jest --config ./test/jest-e2e.json" "test:e2e": "jest --config ./test/jest-e2e.json"
}, },
"dependencies": { "dependencies": {
"@clickhouse/client": "^1.11.2",
"@clickhouse/client-web": "^1.11.2",
"@nestjs/axios": "^4.0.0", "@nestjs/axios": "^4.0.0",
"@nestjs/common": "^11.0.1", "@nestjs/common": "^11.0.1",
"@nestjs/core": "^11.0.1",
"@nestjs/config": "^4.0.0", "@nestjs/config": "^4.0.0",
"@nestjs/platform-express": "^11.0.1", "@nestjs/core": "^11.0.1",
"axios": "^1.7.9",
"reflect-metadata": "^0.2.2",
"dotenv": "^16.3.1",
"rxjs": "^7.8.1",
"@nestjs/typeorm": "^11.0.0",
"pg": "^8.14.1",
"typeorm": "^0.3.21",
"bcrypt": "^5.1.1",
"@types/bcrypt": "^5.0.2",
"socket.io": "^4.8.1",
"@nestjs/websockets": "11.0.12",
"@nestjs/platform-socket.io": "11.0.12",
"passport": "^0.7.0",
"passport-jwt": "^4.0.1",
"cookie-parser": "^1.4.7",
"@types/passport-jwt": "^4.0.1",
"@types/cookie-parser": "^1.4.8",
"@nestjs/jwt": "^11.0.0", "@nestjs/jwt": "^11.0.0",
"@nestjs/passport": "^11.0.5", "@nestjs/passport": "^11.0.5",
"@nestjs/platform-express": "^11.0.1",
"@nestjs/platform-socket.io": "11.0.12",
"@nestjs/swagger": "11.1.4", "@nestjs/swagger": "11.1.4",
"@clickhouse/client": "^1.11.2", "@nestjs/typeorm": "^11.0.0",
"@nestjs/websockets": "11.0.12",
"@types/bcrypt": "^5.0.2",
"@types/cookie-parser": "^1.4.8",
"@types/passport-jwt": "^4.0.1",
"axios": "^1.7.9",
"bcrypt": "^5.1.1",
"cookie-parser": "^1.4.7",
"date-fns": "4.1.0", "date-fns": "4.1.0",
"@clickhouse/client-web": "^1.11.2" "dotenv": "^16.3.1",
"passport": "^0.7.0",
"passport-jwt": "^4.0.1",
"pg": "^8.14.1",
"reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1",
"socket.io": "^4.8.1",
"typeorm": "^0.3.21",
"ws": "^8.18.3"
}, },
"devDependencies": { "devDependencies": {
"@eslint/eslintrc": "^3.2.0", "@eslint/eslintrc": "^3.2.0",

View File

@ -1,253 +1,391 @@
import { import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
WebSocketGateway, import { Server, WebSocket } from 'ws';
WebSocketServer, import { createServer } from 'http';
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { PrometheusService } from './prometheus.service'; import { PrometheusService } from './prometheus.service';
import { Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config';
@WebSocketGateway({ type Filters = Record<string, string>;
cors: {
origin: process.env.FRONTEND_URL, @Injectable()
methods: ['GET', 'POST'], export class MetricsGateway implements OnModuleInit {
credentials: true
},
namespace: '/api/metrics-ws'
})
export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server: Server;
private readonly logger = new Logger(MetricsGateway.name); private readonly logger = new Logger(MetricsGateway.name);
private activeSockets: Map<string, Socket> = new Map(); private wss: Server;
private metricSubscriptions = new Map<string, {
stopUpdates: () => void;
clients: Set<string>;
}>();
private lastSentData = new Map<string, any>(); // Кэш последних отправленных данных
constructor(private readonly prometheusService: PrometheusService) { } private activeSockets = new Map<string, WebSocket>();
private metricSubscriptions = new Map<
string,
{ stopUpdates: () => void; clients: Set<string> }
>();
afterInit(server: Server) { private lastSentData = new Map<string, any>();
this.logger.log('WebSocket Gateway initialized');
constructor(
private readonly prometheusService: PrometheusService,
private readonly configService: ConfigService
) { }
onModuleInit() {
const httpServer = createServer();
this.wss = new Server({
server: httpServer,
path: '/metrics-ws',
});
this.wss.on('connection', (client, request) =>
this.handleConnection(client, request)
);
this.wss.on('error', (err) =>
this.logger.error('WebSocket server error:', err)
);
const wsPort = Number(this.configService.get('WS_PORT') || 3001);
httpServer.listen(wsPort, () => {
this.logger.log(
`WebSocket server running at ws://localhost:${wsPort}/api/metrics-ws`
);
});
} }
handleConnection(client: Socket) {
this.logger.log(`Client connected: ${client.id}`); private handleConnection(client: WebSocket, request: any) {
this.activeSockets.set(client.id, client); let clientId =
this.getQueryParams(request?.url).clientId ||
Math.random().toString(36).slice(2);
this.activeSockets.set(clientId, client);
this.logger.log(`Client connected: ${clientId}`);
client.on('message', (raw) => {
try {
const msg = JSON.parse(raw.toString());
this.handleMessage(clientId, client, msg);
} catch (err) {
this.sendError(client, 'Invalid JSON');
}
});
client.on('close', () => this.cleanupClient(clientId));
client.on('error', (err) => {
this.logger.error(`Client ${clientId} error:`, err);
this.cleanupClient(clientId);
});
} }
handleDisconnect(client: Socket) { private handleMessage(clientId: string, client: WebSocket, message: any) {
this.logger.log(`Client disconnected: ${client.id}`); const { event, data } = message || {};
this.activeSockets.delete(client.id); if (!event) return this.sendError(client, 'Event type is required');
// Очистка всех подписок этого клиента switch (event) {
for (const [metric, subscription] of this.metricSubscriptions) { case 'unsubscribe-all':
subscription.clients.delete(client.id); return this.unsubscribeAllForClient(clientId);
if (subscription.clients.size === 0) {
subscription.stopUpdates(); case 'get-metrics':
this.metricSubscriptions.delete(metric); return this.handleGetMetrics(client, data);
this.lastSentData.delete(metric);
case 'subscribe-metric':
return this.handleSubscribeMetric(clientId, client, data);
case 'unsubscribe-metric':
return this.handleUnsubscribeMetric(clientId, data);
default:
return this.sendError(client, `Unknown event type: ${event}`);
}
}
private cleanupClient(clientId: string) {
if (!this.activeSockets.has(clientId)) return;
this.logger.log(`Client disconnected: ${clientId}`);
this.activeSockets.delete(clientId);
setTimeout(() => {
if (!this.activeSockets.has(clientId)) {
}
}, 5000);
for (const [key, sub] of this.metricSubscriptions) {
sub.clients.delete(clientId);
if (sub.clients.size === 0) {
sub.stopUpdates();
this.metricSubscriptions.delete(key);
this.lastSentData.delete(key);
} }
} }
} }
@SubscribeMessage('unsubscribe-all') private unsubscribeAllForClient(clientId: string) {
handleUnsubscribeAll(client: Socket) { for (const [key, sub] of this.metricSubscriptions) {
for (const [metric, subscription] of this.metricSubscriptions) { if (sub.clients.has(clientId)) sub.clients.delete(clientId);
subscription.clients.delete(client.id); if (sub.clients.size === 0) {
if (subscription.clients.size === 0) { sub.stopUpdates();
subscription.stopUpdates(); this.metricSubscriptions.delete(key);
this.metricSubscriptions.delete(metric); this.lastSentData.delete(key);
this.lastSentData.delete(metric);
} }
} }
} }
@SubscribeMessage('get-metrics')
async handleGetMetrics(client: Socket, payload: any) { private async handleGetMetrics(client: WebSocket, payload: any) {
const { metric, start, end, step, isRangeQuery, requestId, filters = {} } = payload; const {
metric,
start,
end,
step,
isRangeQuery,
requestId,
filters = {},
} = payload || {};
if (!metric) { if (!metric) {
client.emit('metrics-error', { return this.sendError(client, 'Metric name is required', requestId);
error: 'Metric name is required',
requestId
});
return;
} }
if (isRangeQuery) { if (isRangeQuery) {
try { try {
const data = await this.prometheusService.fetchMetricsRange(metric, start, end, step, filters); const rangeData = await this.prometheusService.fetchMetricsRange(
client.emit('metrics-data', { metric, data, requestId }); metric,
return; start,
} catch (error) { end,
client.emit('metrics-error', { step,
error: error.message, filters
requestId );
this.logger.debug('RangeQuery result', JSON.stringify(rangeData).slice(0, 200));
return this.sendMessage(client, {
event: 'metrics-data',
data: rangeData,
metric,
requestId,
}); });
return; } catch (err: any) {
return this.sendError(client, err?.message || 'Range query error', requestId);
} }
} }
try { try {
const subscriptionKey = this.getSubscriptionKey(metric, filters); const subscriptionKey = this.getSubscriptionKey(metric, filters);
// Отправляем текущие данные сразу при запросе
const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
client.emit('metrics-data', { metric, data: initialData, requestId });
this.lastSentData.set(subscriptionKey, initialData);
const stopUpdates = await this.sendPeriodicUpdates( const initialData =
metric, await this.prometheusService.fetchMetricsWithFilters(metric, filters);
step || 5000,
(data) => {
const lastData = this.lastSentData.get(subscriptionKey);
if (!this.isDataEqual(lastData, data)) {
client.emit('metrics-data', { metric, data, requestId });
this.lastSentData.set(subscriptionKey, data);
}
},
filters
);
const cleanup = () => { this.sendMessage(client, {
stopUpdates(); event: 'metrics-data',
this.lastSentData.delete(subscriptionKey); data: { metric, data: initialData, requestId },
client.off('disconnect', cleanup);
client.off('unsubscribe-metric', cleanup);
};
client.on('disconnect', cleanup);
client.on('unsubscribe-metric', cleanup);
} catch (error) {
client.emit('metrics-error', {
error: error.message,
requestId
}); });
let lastLocal = initialData;
const jitter = Math.floor(Math.random() * 5000);
setTimeout(() => {
const timer = setInterval(async () => {
try {
const fresh =
await this.prometheusService.fetchMetricsWithFilters(
metric,
filters
);
if (!this.isDataEqual(lastLocal, fresh)) {
this.sendMessage(client, {
event: 'metrics-data',
data: { metric, data: fresh, requestId },
});
lastLocal = fresh;
}
} catch (e) {
this.logger.error(
`Error in on-demand periodic update for ${subscriptionKey}:`,
(e as any)?.message
);
}
}, step || 5000);
const closeOnce = () => clearInterval(timer);
client.once('close', closeOnce);
client.once('error', closeOnce);
}, jitter);
} catch (err: any) {
return this.sendError(client, err?.message || 'get-metrics error', requestId);
} }
} }
private getSubscriptionKey(metric: string, filters: Record<string, string>): string { private async handleSubscribeMetric(clientId: string, client: WebSocket, payload: any) {
const filterKeys = Object.keys(filters).sort(); const { metric, interval = 60000, filters = {} } = payload || {};
const filterString = filterKeys.map(k => `${k}=${encodeURIComponent(filters[k])}`).join('&'); if (!metric) {
return `${metric}${filterString ? `?${filterString}` : ''}`; this.sendError(client, 'Metric name is required');
} return;
// Сравниваем данные, чтобы избежать лишних отправок
private isDataEqual(oldData: any[], newData: any[]): boolean {
if (!oldData || !newData || oldData.length !== newData.length) return false;
return oldData.every((oldItem, index) => {
const newItem = newData[index];
return (
oldItem.value === newItem.value &&
oldItem.status === newItem.status &&
oldItem.timestamp === newItem.timestamp
);
});
}
@SubscribeMessage('subscribe-metric')
async handleSubscribeMetric(
client: Socket,
payload: {
metric: string;
interval?: number;
filters?: Record<string, string>;
} }
) {
const { metric, interval = 60000, filters = {} } = payload; // По умолчанию 60 секунд
const subscriptionKey = this.getSubscriptionKey(metric, filters);
if (!this.metricSubscriptions.has(subscriptionKey)) { const key = this.getSubscriptionKey(metric, filters);
// Отправляем текущие данные сразу при подписке
try { try {
const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters); const initial = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
client.emit('metrics-data', {
metric: subscriptionKey, if (!Array.isArray(initial)) {
data: initialData throw new Error(`Expected array for metric ${metric}, got ${typeof initial}`);
});
this.lastSentData.set(subscriptionKey, initialData);
} catch (error) {
this.logger.error(`Error fetching initial data for ${metric}:`, error.message);
} }
this.sendMessage(client, {
event: 'metrics-data',
data: {
metric: key,
data: initial,
type: 'initial'
}
});
this.lastSentData.set(key, initial);
const stopUpdates = await this.sendPeriodicUpdates( const stopUpdates = await this.sendPeriodicUpdates(
metric, metric,
interval, interval,
(data) => { (freshData) => {
// Отправляем только если данные изменились if (!Array.isArray(freshData)) {
const lastData = this.lastSentData.get(subscriptionKey); this.logger.error(`Periodic update: expected array for ${key}, got ${typeof freshData}`);
if (!this.isDataEqual(lastData, data)) { return;
this.server.emit('metrics-data', { }
metric: subscriptionKey,
data const lastData = this.lastSentData.get(key);
if (!this.isDataEqual(lastData, freshData)) {
this.broadcast({
event: 'metrics-data',
data: {
metric: key,
data: freshData,
type: 'update'
}
}); });
this.lastSentData.set(subscriptionKey, data); this.lastSentData.set(key, freshData);
} }
}, },
filters filters
); );
this.metricSubscriptions.set(subscriptionKey, { this.metricSubscriptions.set(key, {
stopUpdates, stopUpdates,
clients: new Set([client.id]) clients: new Set([clientId]),
}); });
} else {
this.metricSubscriptions.get(subscriptionKey)?.clients.add(client.id); const unsubscribe = () => {
// Отправляем кэшированные данные новому клиенту this.logger.log(`Unsubscribing client ${clientId} from ${key}`);
const cachedData = this.lastSentData.get(subscriptionKey); const subscription = this.metricSubscriptions.get(key);
if (cachedData) { if (subscription) {
client.emit('metrics-data', { subscription.clients.delete(clientId);
metric: subscriptionKey, if (subscription.clients.size === 0) {
data: cachedData subscription.stopUpdates();
}); this.metricSubscriptions.delete(key);
this.lastSentData.delete(key);
}
}
};
client.on('close', unsubscribe);
client.on('error', unsubscribe);
} catch (error) {
this.logger.error(`Subscription error for ${key}:`, error);
this.sendError(client, error.message);
if (!this.metricSubscriptions.has(key)) {
this.lastSentData.delete(key);
} }
} }
const unsubscribe = () => {
const subscription = this.metricSubscriptions.get(subscriptionKey);
if (subscription) {
subscription.clients.delete(client.id);
if (subscription.clients.size === 0) {
subscription.stopUpdates();
this.metricSubscriptions.delete(subscriptionKey);
this.lastSentData.delete(subscriptionKey);
}
}
};
client.on('disconnect', unsubscribe);
client.on('unsubscribe-metric', unsubscribe);
} }
async sendPeriodicUpdates( private handleUnsubscribeMetric(
clientId: string,
payload: { metric: string; filters?: Filters }
) {
const { metric, filters = {} } = payload || {};
if (!metric) return;
const key = this.getSubscriptionKey(metric, filters);
const sub = this.metricSubscriptions.get(key);
if (!sub) return;
sub.clients.delete(clientId);
if (sub.clients.size === 0) {
sub.stopUpdates();
this.metricSubscriptions.delete(key);
this.lastSentData.delete(key);
}
}
private getQueryParams(rawUrl?: string): Record<string, string> {
try {
const url = new URL(rawUrl || '', 'http://localhost'); // безопасная база
return Object.fromEntries(url.searchParams.entries());
} catch {
return {};
}
}
private getSubscriptionKey(metric: string, filters: Filters): string {
const keys = Object.keys(filters).sort();
const filterString = keys
.map((k) => `${k}=${encodeURIComponent(filters[k])}`)
.join('&');
return `${metric}${filterString ? `?${filterString}` : ''}`;
}
private isDataEqual(a: any[], b: any[]) {
if (!a || !b || a.length !== b.length) return false;
return a.every((item, i) => {
const x = b[i];
return (
item?.value === x?.value &&
item?.status === x?.status &&
item?.timestamp === x?.timestamp
);
});
}
private async sendPeriodicUpdates(
metric: string, metric: string,
interval: number, interval: number,
callback: (data: any) => void, cb: (data: any) => void,
filters: Record<string, string> = {} filters: Filters
) { ) {
// Добавляем небольшую случайную задержку, чтобы избежать пиковой нагрузки
const initialDelay = Math.floor(Math.random() * 5000); const initialDelay = Math.floor(Math.random() * 5000);
await new Promise((r) => setTimeout(r, initialDelay));
await new Promise(resolve => setTimeout(resolve, initialDelay));
const timer = setInterval(async () => { const timer = setInterval(async () => {
try { try {
const data = await this.prometheusService.fetchMetricsWithFilters(metric, filters); const data = await this.prometheusService.fetchMetricsWithFilters(
callback(data); metric,
} catch (error) { filters
this.logger.error(`Error in periodic update for ${metric}:`, error.message); );
cb(data);
} catch (e: any) {
this.logger.error(
`Error in periodic update for ${metric}:`,
e?.message
);
} }
}, interval); }, interval);
return () => { return () => clearInterval(timer);
clearInterval(timer); }
this.lastSentData.delete(this.getSubscriptionKey(metric, filters));
this.logger.log(`Stopped updates for ${metric}`); private sendMessage(client: WebSocket, message: any) {
}; if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(message));
}
}
private broadcast(message: any) {
const raw = JSON.stringify(message);
this.wss.clients.forEach((c) => {
if (c.readyState === WebSocket.OPEN) c.send(raw);
});
}
private sendError(client: WebSocket, error: string, requestId?: string) {
this.sendMessage(client, {
event: 'metrics-error',
data: { error, requestId },
});
} }
} }