mirror of
https://github.com/Buriburizaem0n/nezha_domains.git
synced 2026-05-06 05:38:50 +00:00
e61772e858
* feat: tsdb * fix(ci): remove --parseGoList=false from swag init to fix dependency resolution * fix(ci): fix swag init directory and temporary remove s390x support due to cgo issues * fix(ci): fix swag init output directory to cmd/dashboard/docs * fix(ci): set GOTOOLCHAIN=auto for gosec * feat: add system storage maintenance for SQLite and TSDB * shit * feat: add s390x support and improve service monitoring * ci: upgrade goreleaser-cross image to v1.25 * ci: add libzstd-dev:s390x for cross-compilation * ci: build libzstd for s390x from source * ci: add libzstd_linux_s390x.go for gozstd linking * ci: use vendor mode for s390x gozstd build * ci: clone zstd source for s390x build * refactor(tsdb): rename MaxDiskUsageGB to MinFreeDiskSpaceGB and optimize queries - Rename config to accurately reflect VictoriaMetrics behavior: minimum free disk space threshold - Add QueryServiceHistoryByServerID for batch query optimization - Fix hasStatus to avoid false status counting when only delay data exists - Fix service aggregation boundary: use successCount*2 >= count - Fix serviceID parsing with strconv.ParseUint error handling - Add TagFiltersCacheSize for better query performance * feat(api): add server metrics endpoint and simplify service history response - Add /server/:id/metrics API for querying TSDB server metrics - Simplify getServiceHistory by removing redundant data conversion - Change AvgDelay type from float32 to float64 - Remove generated swagger docs (to be regenerated) - Update TSDB query, writer and tests * chore: 临时禁用不支持前端 * ci: cache zstd build for s390x to speed up CI * fix(tsdb): fix race conditions, data correctness and optimize performance - Fix TOCTOU race between IsClosed() and write/query by holding RLock - Fix delay=0 excluded from stats by using hasDelay flag instead of value > 0 - Fix fmt.Sscanf -> strconv.ParseUint for server_id parsing with error logging - Fix buffer unbounded growth by flushing inside lock when over maxSize - Split makeMetricRow into makeServerMetricRow/makeServiceMetricRow - Extract InitGlobalSettings() from Open() for VictoriaMetrics globals - Remove redundant instance/GetInstance/SetInstance singleton - Add error logging for silently skipped block decode errors - Optimize WriteBatch* to build all rows in single write call - Optimize downsample to use linear scan instead of map for sorted data - Optimize query slice reuse across block iterations * 服务添加DisplayIndex (#1166) * 服务添加DisplayIndex * 根据ai建议修改 --------- Co-authored-by: huYang <306061454@qq.com> * fix(tsdb): restore SQLite fallback and monthly status reload on restart - Restore ServiceHistory model and SQLite write fallback when TSDB is disabled - Reload monthlyStatus (30-day) and serviceStatusToday from TSDB/SQLite on startup - Add SQLite fallback query for /service/:id/history and /server/:id/service - Remove breaking GET /service/:id endpoint, keep /service/:id/history only - Add QueryServiceDailyStats to TSDB for per-day aggregation - Add tests for monthly status and today stats loading from both TSDB and SQLite - Migrate ServiceHistory table only when TSDB is disabled * ci: exclude false-positive gosec rules G117, G703, G704 * feat(api): expose tsdb_enabled in setting response * ci: restore G115 exclusion accidentally dropped in previous commit * fix: update version numbers for OfficialAdmin and Official templates * chore: upgrade frontend * chore: upgrade frontend --------- Co-authored-by: 胡说丷刂 <34758853+laosan-xx@users.noreply.github.com> Co-authored-by: huYang <306061454@qq.com>
302 lines
9.1 KiB
Go
302 lines
9.1 KiB
Go
package tsdb
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
)
|
|
|
|
type bufferedWriter struct {
|
|
db *TSDB
|
|
buffer []storage.MetricRow
|
|
mu sync.Mutex
|
|
maxSize int
|
|
flushTicker *time.Ticker
|
|
stopCh chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func newBufferedWriter(db *TSDB, maxSize int, flushInterval time.Duration) *bufferedWriter {
|
|
w := &bufferedWriter{
|
|
db: db,
|
|
buffer: make([]storage.MetricRow, 0, maxSize),
|
|
maxSize: maxSize,
|
|
flushTicker: time.NewTicker(flushInterval),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
w.wg.Add(1)
|
|
go w.flushLoop()
|
|
return w
|
|
}
|
|
|
|
func (w *bufferedWriter) flushLoop() {
|
|
defer w.wg.Done()
|
|
for {
|
|
select {
|
|
case <-w.flushTicker.C:
|
|
w.flush()
|
|
case <-w.stopCh:
|
|
w.flush()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *bufferedWriter) write(rows []storage.MetricRow) {
|
|
w.mu.Lock()
|
|
w.buffer = append(w.buffer, rows...)
|
|
if len(w.buffer) >= w.maxSize {
|
|
rows := w.buffer
|
|
w.buffer = make([]storage.MetricRow, 0, w.maxSize)
|
|
w.mu.Unlock()
|
|
w.db.storage.AddRows(rows, 64)
|
|
return
|
|
}
|
|
w.mu.Unlock()
|
|
}
|
|
|
|
func (w *bufferedWriter) flush() {
|
|
w.mu.Lock()
|
|
if len(w.buffer) == 0 {
|
|
w.mu.Unlock()
|
|
return
|
|
}
|
|
rows := w.buffer
|
|
w.buffer = make([]storage.MetricRow, 0, w.maxSize)
|
|
w.mu.Unlock()
|
|
|
|
w.db.storage.AddRows(rows, 64)
|
|
}
|
|
|
|
func (w *bufferedWriter) stop() {
|
|
w.flushTicker.Stop()
|
|
close(w.stopCh)
|
|
w.wg.Wait()
|
|
}
|
|
|
|
// MetricType 指标类型
|
|
type MetricType string
|
|
|
|
const (
|
|
// 服务器指标
|
|
MetricServerCPU MetricType = "nezha_server_cpu"
|
|
MetricServerMemory MetricType = "nezha_server_memory"
|
|
MetricServerSwap MetricType = "nezha_server_swap"
|
|
MetricServerDisk MetricType = "nezha_server_disk"
|
|
MetricServerNetInSpeed MetricType = "nezha_server_net_in_speed"
|
|
MetricServerNetOutSpeed MetricType = "nezha_server_net_out_speed"
|
|
MetricServerNetInTransfer MetricType = "nezha_server_net_in_transfer"
|
|
MetricServerNetOutTransfer MetricType = "nezha_server_net_out_transfer"
|
|
MetricServerLoad1 MetricType = "nezha_server_load1"
|
|
MetricServerLoad5 MetricType = "nezha_server_load5"
|
|
MetricServerLoad15 MetricType = "nezha_server_load15"
|
|
MetricServerTCPConn MetricType = "nezha_server_tcp_conn"
|
|
MetricServerUDPConn MetricType = "nezha_server_udp_conn"
|
|
MetricServerProcessCount MetricType = "nezha_server_process_count"
|
|
MetricServerTemperature MetricType = "nezha_server_temperature"
|
|
MetricServerUptime MetricType = "nezha_server_uptime"
|
|
MetricServerGPU MetricType = "nezha_server_gpu"
|
|
|
|
// 服务监控指标
|
|
MetricServiceDelay MetricType = "nezha_service_delay"
|
|
MetricServiceStatus MetricType = "nezha_service_status"
|
|
)
|
|
|
|
// ServerMetrics 服务器指标数据
|
|
type ServerMetrics struct {
|
|
ServerID uint64
|
|
Timestamp time.Time
|
|
CPU float64
|
|
MemUsed uint64
|
|
SwapUsed uint64
|
|
DiskUsed uint64
|
|
NetInSpeed uint64
|
|
NetOutSpeed uint64
|
|
NetInTransfer uint64
|
|
NetOutTransfer uint64
|
|
Load1 float64
|
|
Load5 float64
|
|
Load15 float64
|
|
TCPConnCount uint64
|
|
UDPConnCount uint64
|
|
ProcessCount uint64
|
|
Temperature float64
|
|
Uptime uint64
|
|
GPU float64
|
|
}
|
|
|
|
// ServiceMetrics 服务监控指标数据
|
|
type ServiceMetrics struct {
|
|
ServiceID uint64
|
|
ServerID uint64
|
|
Timestamp time.Time
|
|
Delay float64
|
|
Successful bool
|
|
}
|
|
|
|
func (db *TSDB) WriteServerMetrics(m *ServerMetrics) error {
|
|
db.mu.RLock()
|
|
defer db.mu.RUnlock()
|
|
if db.closed {
|
|
return fmt.Errorf("TSDB is closed")
|
|
}
|
|
|
|
ts := m.Timestamp.UnixMilli()
|
|
serverIDStr := strconv.FormatUint(m.ServerID, 10)
|
|
|
|
rows := []storage.MetricRow{
|
|
makeServerMetricRow(MetricServerCPU, serverIDStr, ts, m.CPU),
|
|
makeServerMetricRow(MetricServerMemory, serverIDStr, ts, float64(m.MemUsed)),
|
|
makeServerMetricRow(MetricServerSwap, serverIDStr, ts, float64(m.SwapUsed)),
|
|
makeServerMetricRow(MetricServerDisk, serverIDStr, ts, float64(m.DiskUsed)),
|
|
makeServerMetricRow(MetricServerNetInSpeed, serverIDStr, ts, float64(m.NetInSpeed)),
|
|
makeServerMetricRow(MetricServerNetOutSpeed, serverIDStr, ts, float64(m.NetOutSpeed)),
|
|
makeServerMetricRow(MetricServerNetInTransfer, serverIDStr, ts, float64(m.NetInTransfer)),
|
|
makeServerMetricRow(MetricServerNetOutTransfer, serverIDStr, ts, float64(m.NetOutTransfer)),
|
|
makeServerMetricRow(MetricServerLoad1, serverIDStr, ts, m.Load1),
|
|
makeServerMetricRow(MetricServerLoad5, serverIDStr, ts, m.Load5),
|
|
makeServerMetricRow(MetricServerLoad15, serverIDStr, ts, m.Load15),
|
|
makeServerMetricRow(MetricServerTCPConn, serverIDStr, ts, float64(m.TCPConnCount)),
|
|
makeServerMetricRow(MetricServerUDPConn, serverIDStr, ts, float64(m.UDPConnCount)),
|
|
makeServerMetricRow(MetricServerProcessCount, serverIDStr, ts, float64(m.ProcessCount)),
|
|
makeServerMetricRow(MetricServerTemperature, serverIDStr, ts, m.Temperature),
|
|
makeServerMetricRow(MetricServerUptime, serverIDStr, ts, float64(m.Uptime)),
|
|
makeServerMetricRow(MetricServerGPU, serverIDStr, ts, m.GPU),
|
|
}
|
|
|
|
if db.writer != nil {
|
|
db.writer.write(rows)
|
|
} else {
|
|
db.storage.AddRows(rows, 64)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *TSDB) WriteServiceMetrics(m *ServiceMetrics) error {
|
|
db.mu.RLock()
|
|
defer db.mu.RUnlock()
|
|
if db.closed {
|
|
return fmt.Errorf("TSDB is closed")
|
|
}
|
|
|
|
ts := m.Timestamp.UnixMilli()
|
|
serviceIDStr := strconv.FormatUint(m.ServiceID, 10)
|
|
serverIDStr := strconv.FormatUint(m.ServerID, 10)
|
|
|
|
var status float64
|
|
if m.Successful {
|
|
status = 1
|
|
}
|
|
|
|
rows := []storage.MetricRow{
|
|
makeServiceMetricRow(MetricServiceDelay, serviceIDStr, serverIDStr, ts, m.Delay),
|
|
makeServiceMetricRow(MetricServiceStatus, serviceIDStr, serverIDStr, ts, status),
|
|
}
|
|
|
|
if db.writer != nil {
|
|
db.writer.write(rows)
|
|
} else {
|
|
db.storage.AddRows(rows, 64)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func makeServerMetricRow(metric MetricType, serverID string, timestamp int64, value float64) storage.MetricRow {
|
|
labels := []prompb.Label{
|
|
{Name: "__name__", Value: string(metric)},
|
|
{Name: "server_id", Value: serverID},
|
|
}
|
|
return storage.MetricRow{
|
|
MetricNameRaw: storage.MarshalMetricNameRaw(nil, labels),
|
|
Timestamp: timestamp,
|
|
Value: value,
|
|
}
|
|
}
|
|
|
|
func makeServiceMetricRow(metric MetricType, serviceID, serverID string, timestamp int64, value float64) storage.MetricRow {
|
|
labels := []prompb.Label{
|
|
{Name: "__name__", Value: string(metric)},
|
|
{Name: "service_id", Value: serviceID},
|
|
{Name: "server_id", Value: serverID},
|
|
}
|
|
return storage.MetricRow{
|
|
MetricNameRaw: storage.MarshalMetricNameRaw(nil, labels),
|
|
Timestamp: timestamp,
|
|
Value: value,
|
|
}
|
|
}
|
|
|
|
func (db *TSDB) WriteBatchServerMetrics(metrics []*ServerMetrics) error {
|
|
db.mu.RLock()
|
|
defer db.mu.RUnlock()
|
|
if db.closed {
|
|
return fmt.Errorf("TSDB is closed")
|
|
}
|
|
|
|
rows := make([]storage.MetricRow, 0, len(metrics)*17)
|
|
for _, m := range metrics {
|
|
ts := m.Timestamp.UnixMilli()
|
|
serverIDStr := strconv.FormatUint(m.ServerID, 10)
|
|
rows = append(rows,
|
|
makeServerMetricRow(MetricServerCPU, serverIDStr, ts, m.CPU),
|
|
makeServerMetricRow(MetricServerMemory, serverIDStr, ts, float64(m.MemUsed)),
|
|
makeServerMetricRow(MetricServerSwap, serverIDStr, ts, float64(m.SwapUsed)),
|
|
makeServerMetricRow(MetricServerDisk, serverIDStr, ts, float64(m.DiskUsed)),
|
|
makeServerMetricRow(MetricServerNetInSpeed, serverIDStr, ts, float64(m.NetInSpeed)),
|
|
makeServerMetricRow(MetricServerNetOutSpeed, serverIDStr, ts, float64(m.NetOutSpeed)),
|
|
makeServerMetricRow(MetricServerNetInTransfer, serverIDStr, ts, float64(m.NetInTransfer)),
|
|
makeServerMetricRow(MetricServerNetOutTransfer, serverIDStr, ts, float64(m.NetOutTransfer)),
|
|
makeServerMetricRow(MetricServerLoad1, serverIDStr, ts, m.Load1),
|
|
makeServerMetricRow(MetricServerLoad5, serverIDStr, ts, m.Load5),
|
|
makeServerMetricRow(MetricServerLoad15, serverIDStr, ts, m.Load15),
|
|
makeServerMetricRow(MetricServerTCPConn, serverIDStr, ts, float64(m.TCPConnCount)),
|
|
makeServerMetricRow(MetricServerUDPConn, serverIDStr, ts, float64(m.UDPConnCount)),
|
|
makeServerMetricRow(MetricServerProcessCount, serverIDStr, ts, float64(m.ProcessCount)),
|
|
makeServerMetricRow(MetricServerTemperature, serverIDStr, ts, m.Temperature),
|
|
makeServerMetricRow(MetricServerUptime, serverIDStr, ts, float64(m.Uptime)),
|
|
makeServerMetricRow(MetricServerGPU, serverIDStr, ts, m.GPU),
|
|
)
|
|
}
|
|
|
|
if db.writer != nil {
|
|
db.writer.write(rows)
|
|
} else {
|
|
db.storage.AddRows(rows, 64)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *TSDB) WriteBatchServiceMetrics(metrics []*ServiceMetrics) error {
|
|
db.mu.RLock()
|
|
defer db.mu.RUnlock()
|
|
if db.closed {
|
|
return fmt.Errorf("TSDB is closed")
|
|
}
|
|
|
|
rows := make([]storage.MetricRow, 0, len(metrics)*2)
|
|
for _, m := range metrics {
|
|
ts := m.Timestamp.UnixMilli()
|
|
serviceIDStr := strconv.FormatUint(m.ServiceID, 10)
|
|
serverIDStr := strconv.FormatUint(m.ServerID, 10)
|
|
var status float64
|
|
if m.Successful {
|
|
status = 1
|
|
}
|
|
rows = append(rows,
|
|
makeServiceMetricRow(MetricServiceDelay, serviceIDStr, serverIDStr, ts, m.Delay),
|
|
makeServiceMetricRow(MetricServiceStatus, serviceIDStr, serverIDStr, ts, status),
|
|
)
|
|
}
|
|
|
|
if db.writer != nil {
|
|
db.writer.write(rows)
|
|
} else {
|
|
db.storage.AddRows(rows, 64)
|
|
}
|
|
return nil
|
|
}
|