optimize subscriptions
parent
926ea01235
commit
8f538c44a8
2
.env
2
.env
|
|
@ -35,4 +35,4 @@
|
|||
#CLICKHOUSE_DB=zvks
|
||||
|
||||
# Для ai api
|
||||
#ANALYSIS_API_URL=http://192.168.2.39:5134/models/api/metrics/rest
|
||||
#AI_SERVICE_URL=http://192.168.2.39:5134
|
||||
|
|
@ -1,11 +1,18 @@
|
|||
import { Controller, Get } from '@nestjs/common';
|
||||
import { Controller, Get, Post } from '@nestjs/common';
|
||||
import { ClickHouseService } from './clickhouse.service';
|
||||
import { ApiOperation, ApiResponse, ApiTags } from '@nestjs/swagger';
|
||||
import { HttpService } from '@nestjs/axios';
|
||||
import { firstValueFrom } from 'rxjs';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
|
||||
@ApiTags('Clickhouse')
|
||||
@Controller('clickhouse')
|
||||
export class ClickHouseController {
|
||||
constructor(private readonly clickhouseService: ClickHouseService) { }
|
||||
constructor(
|
||||
private readonly clickhouseService: ClickHouseService,
|
||||
private readonly httpService: HttpService,
|
||||
private readonly configService: ConfigService,
|
||||
) { }
|
||||
|
||||
@Get()
|
||||
@ApiOperation({ summary: 'Get metrics from ClickHouse' })
|
||||
|
|
@ -32,4 +39,24 @@ export class ClickHouseController {
|
|||
async getClckhouse() {
|
||||
return this.clickhouseService.getClckhouse();
|
||||
}
|
||||
|
||||
@Post('send-to-ai')
|
||||
@ApiOperation({ summary: 'Send metrics to AI service' })
|
||||
@ApiResponse({
|
||||
status: 200,
|
||||
description: 'AI service response',
|
||||
})
|
||||
async sendToAI() {
|
||||
const metrics = await this.clickhouseService.getClckhouse();
|
||||
const aiServiceUrl = this.configService.get('AI_SERVICE_URL/api/metrics/rest');
|
||||
|
||||
try {
|
||||
const response = await firstValueFrom(
|
||||
this.httpService.post(aiServiceUrl, metrics)
|
||||
);
|
||||
return response.data;
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to send data to AI: ${error.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,9 +1,11 @@
|
|||
import { Module, Global } from '@nestjs/common';
|
||||
import { createClient, ClickHouseClient } from '@clickhouse/client';
|
||||
import { ClickHouseService } from './clickhouse.service';
|
||||
import { HttpModule } from '@nestjs/axios';
|
||||
|
||||
@Global()
|
||||
@Module({
|
||||
imports: [HttpModule],
|
||||
providers: [
|
||||
{
|
||||
provide: 'CLICKHOUSE_CLIENT',
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ async function bootstrap() {
|
|||
});
|
||||
|
||||
// Настройка CORS
|
||||
app.enableCors({
|
||||
app.enableCors({//ПОСТАВИТЬ ПРОКСИ, ЧТОБЫ КОРС НЕ РУГАЛСЯ, ИЗМЕНЕНИЕ ПОЛИТИКИ СЕТЕВЫХ ПАКЕТОВ. ПИШУ IP СВОЙ, А ПОРТ ПРОКСИ. REVERSE PROXY.
|
||||
origin: [process.env.FRONTEND_URL, "http://dev.msf.enode"],
|
||||
credentials: true,
|
||||
methods: 'GET,HEAD,PUT,PATCH,POST,DELETE,OPTIONS',
|
||||
|
|
|
|||
|
|
@ -6,26 +6,33 @@ import { MenuItem } from './menu.interface';
|
|||
export class MenuController {
|
||||
constructor(private readonly menuService: MenuService) { }
|
||||
|
||||
@Get('full')
|
||||
async getFullMenu(@Headers('if-modified-since') ifModifiedSince?: string) {
|
||||
try {
|
||||
const result = await this.menuService.getFullMenuWithCache(ifModifiedSince);
|
||||
@Get('full')
|
||||
async getFullMenu(@Headers('if-modified-since') ifModifiedSince?: string) {
|
||||
console.log('GET /menu/full requested');
|
||||
try {
|
||||
const result = await this.menuService.getFullMenuWithCache(ifModifiedSince);
|
||||
|
||||
if (!result.fresh && ifModifiedSince) {
|
||||
throw new HttpException('Not Modified', HttpStatus.NOT_MODIFIED);
|
||||
}
|
||||
if (!result.fresh && ifModifiedSince) {
|
||||
throw new HttpException('Not Modified', HttpStatus.NOT_MODIFIED);
|
||||
}
|
||||
|
||||
return result.menu;
|
||||
} catch (error) {
|
||||
if (error.status === HttpStatus.NOT_MODIFIED) {
|
||||
throw error;
|
||||
return result.menu;
|
||||
} catch (error) {
|
||||
if (error.status === HttpStatus.NOT_MODIFIED) {
|
||||
throw error;
|
||||
}
|
||||
throw new HttpException(
|
||||
error.message || 'Failed to load menu',
|
||||
HttpStatus.INTERNAL_SERVER_ERROR
|
||||
);
|
||||
}
|
||||
throw new HttpException(
|
||||
error.message || 'Failed to load menu',
|
||||
HttpStatus.INTERNAL_SERVER_ERROR
|
||||
);
|
||||
}
|
||||
}
|
||||
/*
|
||||
@Get('full')
|
||||
async getFullMenu() {
|
||||
console.log('Simplified endpoint called');
|
||||
return { test: 'OK' }; // Простейший ответ
|
||||
} */
|
||||
|
||||
@Get('check-updates')
|
||||
async checkUpdates(@Headers('if-modified-since') ifModifiedSince: string) {
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
stopUpdates: () => void;
|
||||
clients: Set<string>;
|
||||
}>();
|
||||
private lastSentData = new Map<string, any>(); // Кэш последних отправленных данных
|
||||
|
||||
constructor(private readonly prometheusService: PrometheusService) { }
|
||||
|
||||
|
|
@ -48,6 +49,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
if (subscription.clients.size === 0) {
|
||||
subscription.stopUpdates();
|
||||
this.metricSubscriptions.delete(metric);
|
||||
this.lastSentData.delete(metric);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -59,6 +61,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
if (subscription.clients.size === 0) {
|
||||
subscription.stopUpdates();
|
||||
this.metricSubscriptions.delete(metric);
|
||||
this.lastSentData.delete(metric);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -90,17 +93,28 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
}
|
||||
|
||||
try {
|
||||
const subscriptionKey = this.getSubscriptionKey(metric, filters);
|
||||
// Отправляем текущие данные сразу при запросе
|
||||
const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||
client.emit('metrics-data', { metric, data: initialData, requestId });
|
||||
this.lastSentData.set(subscriptionKey, initialData);
|
||||
|
||||
const stopUpdates = await this.sendPeriodicUpdates(
|
||||
metric,
|
||||
step || 5000,
|
||||
(data) => {
|
||||
client.emit('metrics-data', { metric, data, requestId });
|
||||
const lastData = this.lastSentData.get(subscriptionKey);
|
||||
if (!this.isDataEqual(lastData, data)) {
|
||||
client.emit('metrics-data', { metric, data, requestId });
|
||||
this.lastSentData.set(subscriptionKey, data);
|
||||
}
|
||||
},
|
||||
filters
|
||||
);
|
||||
|
||||
const cleanup = () => {
|
||||
stopUpdates();
|
||||
this.lastSentData.delete(subscriptionKey);
|
||||
client.off('disconnect', cleanup);
|
||||
client.off('unsubscribe-metric', cleanup);
|
||||
};
|
||||
|
|
@ -120,7 +134,21 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
const filterKeys = Object.keys(filters).sort();
|
||||
const filterString = filterKeys.map(k => `${k}=${encodeURIComponent(filters[k])}`).join('&');
|
||||
return `${metric}${filterString ? `?${filterString}` : ''}`;
|
||||
}
|
||||
}
|
||||
|
||||
// Сравниваем данные, чтобы избежать лишних отправок
|
||||
private isDataEqual(oldData: any[], newData: any[]): boolean {
|
||||
if (!oldData || !newData || oldData.length !== newData.length) return false;
|
||||
|
||||
return oldData.every((oldItem, index) => {
|
||||
const newItem = newData[index];
|
||||
return (
|
||||
oldItem.value === newItem.value &&
|
||||
oldItem.status === newItem.status &&
|
||||
oldItem.timestamp === newItem.timestamp
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@SubscribeMessage('subscribe-metric')
|
||||
async handleSubscribeMetric(
|
||||
|
|
@ -131,18 +159,35 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
filters?: Record<string, string>;
|
||||
}
|
||||
) {
|
||||
const { metric, interval = 5000, filters = {} } = payload;
|
||||
const { metric, interval = 60000, filters = {} } = payload; // По умолчанию 60 секунд
|
||||
const subscriptionKey = this.getSubscriptionKey(metric, filters);
|
||||
|
||||
if (!this.metricSubscriptions.has(subscriptionKey)) {
|
||||
// Отправляем текущие данные сразу при подписке
|
||||
try {
|
||||
const initialData = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||
client.emit('metrics-data', {
|
||||
metric: subscriptionKey,
|
||||
data: initialData
|
||||
});
|
||||
this.lastSentData.set(subscriptionKey, initialData);
|
||||
} catch (error) {
|
||||
this.logger.error(`Error fetching initial data for ${metric}:`, error.message);
|
||||
}
|
||||
|
||||
const stopUpdates = await this.sendPeriodicUpdates(
|
||||
metric,
|
||||
interval,
|
||||
(data) => {
|
||||
this.server.emit('metrics-data', {
|
||||
metric: subscriptionKey,
|
||||
data
|
||||
});
|
||||
// Отправляем только если данные изменились
|
||||
const lastData = this.lastSentData.get(subscriptionKey);
|
||||
if (!this.isDataEqual(lastData, data)) {
|
||||
this.server.emit('metrics-data', {
|
||||
metric: subscriptionKey,
|
||||
data
|
||||
});
|
||||
this.lastSentData.set(subscriptionKey, data);
|
||||
}
|
||||
},
|
||||
filters
|
||||
);
|
||||
|
|
@ -153,6 +198,14 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
});
|
||||
} else {
|
||||
this.metricSubscriptions.get(subscriptionKey)?.clients.add(client.id);
|
||||
// Отправляем кэшированные данные новому клиенту
|
||||
const cachedData = this.lastSentData.get(subscriptionKey);
|
||||
if (cachedData) {
|
||||
client.emit('metrics-data', {
|
||||
metric: subscriptionKey,
|
||||
data: cachedData
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const unsubscribe = () => {
|
||||
|
|
@ -162,6 +215,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
if (subscription.clients.size === 0) {
|
||||
subscription.stopUpdates();
|
||||
this.metricSubscriptions.delete(subscriptionKey);
|
||||
this.lastSentData.delete(subscriptionKey);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -176,6 +230,11 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
callback: (data: any) => void,
|
||||
filters: Record<string, string> = {}
|
||||
) {
|
||||
// Добавляем небольшую случайную задержку, чтобы избежать пиковой нагрузки
|
||||
const initialDelay = Math.floor(Math.random() * 5000);
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, initialDelay));
|
||||
|
||||
const timer = setInterval(async () => {
|
||||
try {
|
||||
const data = await this.prometheusService.fetchMetricsWithFilters(metric, filters);
|
||||
|
|
@ -187,6 +246,7 @@ export class MetricsGateway implements OnGatewayInit, OnGatewayConnection, OnGat
|
|||
|
||||
return () => {
|
||||
clearInterval(timer);
|
||||
this.lastSentData.delete(this.getSubscriptionKey(metric, filters));
|
||||
this.logger.log(`Stopped updates for ${metric}`);
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ import { MenuItem } from '../menu/menu.interface';
|
|||
@Injectable()
|
||||
export class PrometheusService {
|
||||
private readonly prometheusUrl: string;
|
||||
private metricCache = new Map<string, { data: any; timestamp: number }>();
|
||||
private metadataCache = new Map<string, { type: string | null; description: string | undefined; timestamp: number }>();
|
||||
|
||||
constructor(
|
||||
private readonly httpService: HttpService,
|
||||
|
|
@ -18,6 +20,13 @@ export class PrometheusService {
|
|||
}
|
||||
|
||||
async fetchMetricType(metric: string): Promise<string | null> {
|
||||
const cacheKey = `metadata-type-${metric}`;
|
||||
const cacheEntry = this.metadataCache.get(cacheKey);
|
||||
|
||||
if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) {
|
||||
return cacheEntry.type;
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await lastValueFrom(
|
||||
this.httpService.get(`${this.prometheusUrl}/metadata`, {
|
||||
|
|
@ -25,14 +34,29 @@ export class PrometheusService {
|
|||
})
|
||||
);
|
||||
const metadata = response.data.data[metric];
|
||||
return metadata?.length ? metadata[0].type : null;
|
||||
const result = metadata?.length ? metadata[0].type : null;
|
||||
|
||||
this.metadataCache.set(cacheKey, {
|
||||
type: result,
|
||||
description: cacheEntry?.description,
|
||||
timestamp: Date.now()
|
||||
});
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
console.error(`Ошибка при получении типа метрики ${metric}:`, error);
|
||||
return null;
|
||||
return cacheEntry?.type || null;
|
||||
}
|
||||
}
|
||||
|
||||
async fetchMetricDescription(metric: string): Promise<string | undefined> {
|
||||
const cacheKey = `metadata-description-${metric}`;
|
||||
const cacheEntry = this.metadataCache.get(cacheKey);
|
||||
|
||||
if (cacheEntry && Date.now() - cacheEntry.timestamp < 30000) {
|
||||
return cacheEntry.description;
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await lastValueFrom(
|
||||
this.httpService.get(`${this.prometheusUrl}/metadata`, {
|
||||
|
|
@ -40,14 +64,29 @@ export class PrometheusService {
|
|||
})
|
||||
);
|
||||
const metadata = response.data.data[metric];
|
||||
return metadata?.length ? metadata[0].help : undefined;
|
||||
const result = metadata?.length ? metadata[0].help : undefined;
|
||||
|
||||
this.metadataCache.set(cacheKey, {
|
||||
type: cacheEntry?.type ?? null,
|
||||
description: result,
|
||||
timestamp: Date.now()
|
||||
});
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
console.error(`Ошибка при получении описания метрики ${metric}:`, error);
|
||||
return undefined;
|
||||
return cacheEntry?.description;
|
||||
}
|
||||
}
|
||||
|
||||
async fetchMetrics(metric: string): Promise<PrometheusMetric[]> {
|
||||
const cacheKey = `${metric}:{}`;
|
||||
const cacheEntry = this.metricCache.get(cacheKey);
|
||||
|
||||
if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) {
|
||||
return cacheEntry.data;
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await lastValueFrom(
|
||||
this.httpService.get(`${this.prometheusUrl}/query`, {
|
||||
|
|
@ -58,7 +97,7 @@ export class PrometheusService {
|
|||
const metricType = await this.fetchMetricType(metric);
|
||||
const metricDescription = await this.fetchMetricDescription(metric);
|
||||
|
||||
return response.data.data.result.map((entry): PrometheusMetric => ({
|
||||
const result = response.data.data.result.map((entry): PrometheusMetric => ({
|
||||
__name__: entry.metric.__name__ || metric,
|
||||
device: entry.metric.device,
|
||||
instance: entry.metric.instance,
|
||||
|
|
@ -71,13 +110,24 @@ export class PrometheusService {
|
|||
description: metricDescription,
|
||||
...entry.metric
|
||||
}));
|
||||
|
||||
this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() });
|
||||
return result;
|
||||
} catch (error) {
|
||||
console.error(`Error fetching metrics for ${metric}:`, error);
|
||||
if (cacheEntry) return cacheEntry.data;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async fetchMetricsWithFilters(metric: string, filters: Record<string, string>): Promise<PrometheusMetric[]> {
|
||||
const cacheKey = `${metric}:${JSON.stringify(filters)}`;
|
||||
const cacheEntry = this.metricCache.get(cacheKey);
|
||||
|
||||
if (cacheEntry && Date.now() - cacheEntry.timestamp < 5000) {
|
||||
return cacheEntry.data;
|
||||
}
|
||||
|
||||
try {
|
||||
const query = this.buildFilteredQuery(metric, filters);
|
||||
const response = await lastValueFrom(
|
||||
|
|
@ -89,7 +139,7 @@ export class PrometheusService {
|
|||
const metricType = await this.fetchMetricType(metric);
|
||||
const metricDescription = await this.fetchMetricDescription(metric);
|
||||
|
||||
return response.data.data.result.map((entry): PrometheusMetric => ({
|
||||
const result = response.data.data.result.map((entry): PrometheusMetric => ({
|
||||
__name__: entry.metric.__name__ || metric,
|
||||
device: entry.metric.device,
|
||||
instance: entry.metric.instance,
|
||||
|
|
@ -102,8 +152,12 @@ export class PrometheusService {
|
|||
description: metricDescription,
|
||||
...entry.metric
|
||||
}));
|
||||
|
||||
this.metricCache.set(cacheKey, { data: result, timestamp: Date.now() });
|
||||
return result;
|
||||
} catch (error) {
|
||||
console.error(`Error fetching metrics with filters for ${metric}:`, error);
|
||||
if (cacheEntry) return cacheEntry.data;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -112,7 +166,6 @@ export class PrometheusService {
|
|||
const filterParts = Object.entries(filters)
|
||||
.filter(([_, value]) => value !== undefined && value !== null && value !== "")
|
||||
.map(([key, value]) => {
|
||||
// Убираем автоматическое добавление "module$" для source_id
|
||||
return `${key}="${value}"`;
|
||||
});
|
||||
|
||||
|
|
@ -122,10 +175,15 @@ export class PrometheusService {
|
|||
}
|
||||
|
||||
async fetchMetricsRange(metric: string, start: number, end: number, step: number, filters: Record<string, string> = {}): Promise<PrometheusMetric[]> {
|
||||
// Рассчитываем оптимальный шаг, если не указан
|
||||
const duration = end - start;
|
||||
const optimalStep = Math.max(Math.floor(duration / 1000), 15); // Минимум 15 секунд
|
||||
|
||||
const query = this.buildFilteredQuery(metric, {
|
||||
...filters,
|
||||
instance: '192.168.2.34:9050'
|
||||
});
|
||||
|
||||
try {
|
||||
const response = await lastValueFrom(
|
||||
this.httpService.get(`${this.prometheusUrl}/query_range`, {
|
||||
|
|
@ -133,7 +191,7 @@ export class PrometheusService {
|
|||
query,
|
||||
start,
|
||||
end,
|
||||
step: step.toString()
|
||||
step: optimalStep.toString()
|
||||
},
|
||||
})
|
||||
);
|
||||
|
|
@ -217,21 +275,40 @@ export class PrometheusService {
|
|||
}
|
||||
|
||||
async fetchAllMetrics(): Promise<string[]> {
|
||||
const response = await lastValueFrom(
|
||||
this.httpService.get(`${this.prometheusUrl}/label/__name__/values`)
|
||||
);
|
||||
return response.data.data;
|
||||
try {
|
||||
const response = await lastValueFrom(
|
||||
this.httpService.get(`${this.prometheusUrl}/label/__name__/values`)
|
||||
);
|
||||
return response.data.data;
|
||||
} catch (error) {
|
||||
console.error('Error fetching all metrics:', error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async fetchAllMetricsWithValues(): Promise<any[]> {
|
||||
const metricNames = await this.fetchAllMetrics();
|
||||
const zvksMetrics = metricNames.filter(metric => metric.startsWith('zvks'));
|
||||
const zvksMetrics = metricNames.filter(metric =>
|
||||
metric.startsWith('zvks') ||
|
||||
metric.includes('server_li') ||
|
||||
metric.includes('application_li')
|
||||
);
|
||||
|
||||
const promises = zvksMetrics.map(async (metric) => {
|
||||
const data = await this.fetchMetrics(metric);
|
||||
return { metric, data };
|
||||
try {
|
||||
const data = await this.fetchMetrics(metric);
|
||||
return { metric, data };
|
||||
} catch (error) {
|
||||
console.error(`Error fetching data for metric ${metric}:`, error);
|
||||
return { metric, data: [] };
|
||||
}
|
||||
});
|
||||
|
||||
return Promise.all(promises);
|
||||
}
|
||||
|
||||
clearCache(): void {
|
||||
this.metricCache.clear();
|
||||
this.metadataCache.clear();
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue