Merge upstream/master and resolve conflicts

This commit is contained in:
2026-04-16 11:58:30 +08:00
52 changed files with 4612 additions and 419 deletions
+4
View File
@@ -114,6 +114,10 @@ func (provider *Provider) splitDomainSOA(ctx context.Context, domain string) (pr
if soa, ok := r.Answer[0].(*dns.SOA); ok {
zone := soa.Hdr.Name
prefix := libdns.RelativeName(domain, zone)
// Convert "@" to empty string for zone apex
if prefix == "@" {
prefix = ""
}
return prefix, zone, nil
}
}
-5
View File
@@ -2,7 +2,6 @@ package ddns
import (
"context"
"os"
"testing"
)
@@ -13,10 +12,6 @@ type testSt struct {
}
func TestSplitDomainSOA(t *testing.T) {
if ci := os.Getenv("CI"); ci != "" { // skip if test on CI
return
}
cases := []testSt{
{
domain: "www.example.co.uk",
Binary file not shown.
@@ -0,0 +1,312 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-01-30 21:58+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: Automatically generated\n"
"Language-Team: none\n"
"Language: gl\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=2; plural=n != 1;\n"
#: cmd/dashboard/controller/alertrule.go:104
#, c-format
msgid "alert id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:108
#: cmd/dashboard/controller/alertrule.go:156
#: cmd/dashboard/controller/alertrule.go:176
#: cmd/dashboard/controller/controller.go:226
#: cmd/dashboard/controller/cron.go:58 cmd/dashboard/controller/cron.go:124
#: cmd/dashboard/controller/cron.go:136 cmd/dashboard/controller/cron.go:195
#: cmd/dashboard/controller/cron.go:224 cmd/dashboard/controller/ddns.go:131
#: cmd/dashboard/controller/ddns.go:192 cmd/dashboard/controller/fm.go:43
#: cmd/dashboard/controller/nat.go:59 cmd/dashboard/controller/nat.go:111
#: cmd/dashboard/controller/nat.go:122 cmd/dashboard/controller/nat.go:162
#: cmd/dashboard/controller/notification.go:112
#: cmd/dashboard/controller/notification.go:166
#: cmd/dashboard/controller/notification_group.go:76
#: cmd/dashboard/controller/notification_group.go:152
#: cmd/dashboard/controller/notification_group.go:164
#: cmd/dashboard/controller/notification_group.go:233
#: cmd/dashboard/controller/server.go:66 cmd/dashboard/controller/server.go:78
#: cmd/dashboard/controller/server.go:137
#: cmd/dashboard/controller/server.go:201
#: cmd/dashboard/controller/server_group.go:75
#: cmd/dashboard/controller/server_group.go:150
#: cmd/dashboard/controller/server_group.go:229
#: cmd/dashboard/controller/service.go:271
#: cmd/dashboard/controller/service.go:342
#: cmd/dashboard/controller/service.go:369
#: cmd/dashboard/controller/terminal.go:41
msgid "permission denied"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:184
msgid "duration need to be at least 3"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:188
msgid "cycle_interval need to be at least 1"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:191
msgid "cycle_start is not set"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:194
msgid "cycle_start is a future value"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:199
msgid "need to configure at least a single rule"
msgstr ""
#: cmd/dashboard/controller/controller.go:220
#: cmd/dashboard/controller/oauth2.go:153
#: cmd/dashboard/controller/server_group.go:162
#: cmd/dashboard/controller/service.go:97 cmd/dashboard/controller/user.go:27
#: cmd/dashboard/controller/user.go:63
msgid "unauthorized"
msgstr ""
#: cmd/dashboard/controller/controller.go:243
msgid "database error"
msgstr ""
#: cmd/dashboard/controller/cron.go:75 cmd/dashboard/controller/cron.go:149
msgid "scheduled tasks cannot be triggered by alarms"
msgstr ""
#: cmd/dashboard/controller/cron.go:132 cmd/dashboard/controller/cron.go:190
#, c-format
msgid "task id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/ddns.go:57 cmd/dashboard/controller/ddns.go:122
msgid "the retry count must be an integer between 1 and 10"
msgstr ""
#: cmd/dashboard/controller/ddns.go:81 cmd/dashboard/controller/ddns.go:154
msgid "error parsing %s: %v"
msgstr ""
#: cmd/dashboard/controller/ddns.go:127 cmd/dashboard/controller/nat.go:118
#, c-format
msgid "profile id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/fm.go:39 cmd/dashboard/controller/terminal.go:37
msgid "server not found or not connected"
msgstr ""
#: cmd/dashboard/controller/notification.go:69
#: cmd/dashboard/controller/notification.go:131
msgid "a test message"
msgstr ""
#: cmd/dashboard/controller/notification.go:108
#, c-format
msgid "notification id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/notification_group.go:94
#: cmd/dashboard/controller/notification_group.go:175
msgid "have invalid notification id"
msgstr ""
#: cmd/dashboard/controller/notification_group.go:160
#: cmd/dashboard/controller/server_group.go:158
#, c-format
msgid "group id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:42 cmd/dashboard/controller/oauth2.go:83
msgid "provider is required"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:52 cmd/dashboard/controller/oauth2.go:87
#: cmd/dashboard/controller/oauth2.go:132
msgid "provider not found"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:100
msgid "operation not permitted"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:138
msgid "code is required"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:175
msgid "oauth2 user not binded yet"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:217
#: cmd/dashboard/controller/oauth2.go:223
#: cmd/dashboard/controller/oauth2.go:228
msgid "invalid state key"
msgstr ""
#: cmd/dashboard/controller/server.go:74
#, c-format
msgid "server id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/server.go:250
msgid "operation timeout"
msgstr ""
#: cmd/dashboard/controller/server.go:257
msgid "get server config failed: %v"
msgstr ""
#: cmd/dashboard/controller/server.go:261
msgid "get server config failed"
msgstr ""
#: cmd/dashboard/controller/server_group.go:92
#: cmd/dashboard/controller/server_group.go:172
msgid "have invalid server id"
msgstr ""
#: cmd/dashboard/controller/service.go:90
#: cmd/dashboard/controller/service.go:165
msgid "server not found"
msgstr ""
#: cmd/dashboard/controller/service.go:267
#, c-format
msgid "service id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/user.go:68
msgid "incorrect password"
msgstr ""
#: cmd/dashboard/controller/user.go:82
msgid "you don't have any oauth2 bindings"
msgstr ""
#: cmd/dashboard/controller/user.go:131
msgid "password length must be greater than 6"
msgstr ""
#: cmd/dashboard/controller/user.go:134
msgid "username can't be empty"
msgstr ""
#: cmd/dashboard/controller/user.go:137
msgid "invalid role"
msgstr ""
#: cmd/dashboard/controller/user.go:176
msgid "can't delete yourself"
msgstr ""
#: service/rpc/io_stream.go:128
msgid "timeout: no connection established"
msgstr ""
#: service/rpc/io_stream.go:131
msgid "timeout: user connection not established"
msgstr ""
#: service/rpc/io_stream.go:134
msgid "timeout: agent connection not established"
msgstr ""
#: service/rpc/nezha.go:71
msgid "Scheduled Task Executed Successfully"
msgstr ""
#: service/rpc/nezha.go:75
msgid "Scheduled Task Executed Failed"
msgstr ""
#: service/rpc/nezha.go:274
msgid "IP Changed"
msgstr ""
#: service/singleton/alertsentinel.go:169
msgid "Incident"
msgstr ""
#: service/singleton/alertsentinel.go:179
msgid "Resolved"
msgstr ""
#: service/singleton/crontask.go:54
msgid "Tasks failed to register: ["
msgstr ""
#: service/singleton/crontask.go:61
msgid ""
"] These tasks will not execute properly. Fix them in the admin dashboard."
msgstr ""
#: service/singleton/crontask.go:144 service/singleton/crontask.go:169
#, c-format
msgid "[Task failed] %s: server %s is offline and cannot execute the task"
msgstr ""
#: service/singleton/servicesentinel.go:468
#, c-format
msgid "[Latency] %s %2f > %2f, Reporter: %s"
msgstr ""
#: service/singleton/servicesentinel.go:475
#, c-format
msgid "[Latency] %s %2f < %2f, Reporter: %s"
msgstr ""
#: service/singleton/servicesentinel.go:501
#, c-format
msgid "[%s] %s Reporter: %s, Error: %s"
msgstr ""
#: service/singleton/servicesentinel.go:544
#, c-format
msgid "[TLS] Fetch cert info failed, Reporter: %s, Error: %s"
msgstr ""
#: service/singleton/servicesentinel.go:584
#, c-format
msgid "The TLS certificate will expire within seven days. Expiration time: %s"
msgstr ""
#: service/singleton/servicesentinel.go:597
#, c-format
msgid ""
"TLS certificate changed, old: issuer %s, expires at %s; new: issuer %s, "
"expires at %s"
msgstr ""
#: service/singleton/servicesentinel.go:633
msgid "No Data"
msgstr ""
#: service/singleton/servicesentinel.go:635
msgid "Good"
msgstr ""
#: service/singleton/servicesentinel.go:637
msgid "Low Availability"
msgstr ""
#: service/singleton/servicesentinel.go:639
msgid "Down"
msgstr ""
#: service/singleton/user.go:60
msgid "user id not specified"
msgstr ""
Binary file not shown.
@@ -0,0 +1,312 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-01-30 21:58+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: Automatically generated\n"
"Language-Team: none\n"
"Language: ja\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=1; plural=0;\n"
#: cmd/dashboard/controller/alertrule.go:104
#, c-format
msgid "alert id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:108
#: cmd/dashboard/controller/alertrule.go:156
#: cmd/dashboard/controller/alertrule.go:176
#: cmd/dashboard/controller/controller.go:226
#: cmd/dashboard/controller/cron.go:58 cmd/dashboard/controller/cron.go:124
#: cmd/dashboard/controller/cron.go:136 cmd/dashboard/controller/cron.go:195
#: cmd/dashboard/controller/cron.go:224 cmd/dashboard/controller/ddns.go:131
#: cmd/dashboard/controller/ddns.go:192 cmd/dashboard/controller/fm.go:43
#: cmd/dashboard/controller/nat.go:59 cmd/dashboard/controller/nat.go:111
#: cmd/dashboard/controller/nat.go:122 cmd/dashboard/controller/nat.go:162
#: cmd/dashboard/controller/notification.go:112
#: cmd/dashboard/controller/notification.go:166
#: cmd/dashboard/controller/notification_group.go:76
#: cmd/dashboard/controller/notification_group.go:152
#: cmd/dashboard/controller/notification_group.go:164
#: cmd/dashboard/controller/notification_group.go:233
#: cmd/dashboard/controller/server.go:66 cmd/dashboard/controller/server.go:78
#: cmd/dashboard/controller/server.go:137
#: cmd/dashboard/controller/server.go:201
#: cmd/dashboard/controller/server_group.go:75
#: cmd/dashboard/controller/server_group.go:150
#: cmd/dashboard/controller/server_group.go:229
#: cmd/dashboard/controller/service.go:271
#: cmd/dashboard/controller/service.go:342
#: cmd/dashboard/controller/service.go:369
#: cmd/dashboard/controller/terminal.go:41
msgid "permission denied"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:184
msgid "duration need to be at least 3"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:188
msgid "cycle_interval need to be at least 1"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:191
msgid "cycle_start is not set"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:194
msgid "cycle_start is a future value"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:199
msgid "need to configure at least a single rule"
msgstr ""
#: cmd/dashboard/controller/controller.go:220
#: cmd/dashboard/controller/oauth2.go:153
#: cmd/dashboard/controller/server_group.go:162
#: cmd/dashboard/controller/service.go:97 cmd/dashboard/controller/user.go:27
#: cmd/dashboard/controller/user.go:63
msgid "unauthorized"
msgstr ""
#: cmd/dashboard/controller/controller.go:243
msgid "database error"
msgstr ""
#: cmd/dashboard/controller/cron.go:75 cmd/dashboard/controller/cron.go:149
msgid "scheduled tasks cannot be triggered by alarms"
msgstr ""
#: cmd/dashboard/controller/cron.go:132 cmd/dashboard/controller/cron.go:190
#, c-format
msgid "task id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/ddns.go:57 cmd/dashboard/controller/ddns.go:122
msgid "the retry count must be an integer between 1 and 10"
msgstr ""
#: cmd/dashboard/controller/ddns.go:81 cmd/dashboard/controller/ddns.go:154
msgid "error parsing %s: %v"
msgstr ""
#: cmd/dashboard/controller/ddns.go:127 cmd/dashboard/controller/nat.go:118
#, c-format
msgid "profile id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/fm.go:39 cmd/dashboard/controller/terminal.go:37
msgid "server not found or not connected"
msgstr ""
#: cmd/dashboard/controller/notification.go:69
#: cmd/dashboard/controller/notification.go:131
msgid "a test message"
msgstr ""
#: cmd/dashboard/controller/notification.go:108
#, c-format
msgid "notification id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/notification_group.go:94
#: cmd/dashboard/controller/notification_group.go:175
msgid "have invalid notification id"
msgstr ""
#: cmd/dashboard/controller/notification_group.go:160
#: cmd/dashboard/controller/server_group.go:158
#, c-format
msgid "group id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:42 cmd/dashboard/controller/oauth2.go:83
msgid "provider is required"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:52 cmd/dashboard/controller/oauth2.go:87
#: cmd/dashboard/controller/oauth2.go:132
msgid "provider not found"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:100
msgid "operation not permitted"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:138
msgid "code is required"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:175
msgid "oauth2 user not binded yet"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:217
#: cmd/dashboard/controller/oauth2.go:223
#: cmd/dashboard/controller/oauth2.go:228
msgid "invalid state key"
msgstr ""
#: cmd/dashboard/controller/server.go:74
#, c-format
msgid "server id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/server.go:250
msgid "operation timeout"
msgstr ""
#: cmd/dashboard/controller/server.go:257
msgid "get server config failed: %v"
msgstr ""
#: cmd/dashboard/controller/server.go:261
msgid "get server config failed"
msgstr ""
#: cmd/dashboard/controller/server_group.go:92
#: cmd/dashboard/controller/server_group.go:172
msgid "have invalid server id"
msgstr ""
#: cmd/dashboard/controller/service.go:90
#: cmd/dashboard/controller/service.go:165
msgid "server not found"
msgstr ""
#: cmd/dashboard/controller/service.go:267
#, c-format
msgid "service id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/user.go:68
msgid "incorrect password"
msgstr ""
#: cmd/dashboard/controller/user.go:82
msgid "you don't have any oauth2 bindings"
msgstr ""
#: cmd/dashboard/controller/user.go:131
msgid "password length must be greater than 6"
msgstr ""
#: cmd/dashboard/controller/user.go:134
msgid "username can't be empty"
msgstr ""
#: cmd/dashboard/controller/user.go:137
msgid "invalid role"
msgstr ""
#: cmd/dashboard/controller/user.go:176
msgid "can't delete yourself"
msgstr ""
#: service/rpc/io_stream.go:128
msgid "timeout: no connection established"
msgstr ""
#: service/rpc/io_stream.go:131
msgid "timeout: user connection not established"
msgstr ""
#: service/rpc/io_stream.go:134
msgid "timeout: agent connection not established"
msgstr ""
#: service/rpc/nezha.go:71
msgid "Scheduled Task Executed Successfully"
msgstr ""
#: service/rpc/nezha.go:75
msgid "Scheduled Task Executed Failed"
msgstr ""
#: service/rpc/nezha.go:274
msgid "IP Changed"
msgstr ""
#: service/singleton/alertsentinel.go:169
msgid "Incident"
msgstr ""
#: service/singleton/alertsentinel.go:179
msgid "Resolved"
msgstr ""
#: service/singleton/crontask.go:54
msgid "Tasks failed to register: ["
msgstr ""
#: service/singleton/crontask.go:61
msgid ""
"] These tasks will not execute properly. Fix them in the admin dashboard."
msgstr ""
#: service/singleton/crontask.go:144 service/singleton/crontask.go:169
#, c-format
msgid "[Task failed] %s: server %s is offline and cannot execute the task"
msgstr ""
#: service/singleton/servicesentinel.go:468
#, c-format
msgid "[Latency] %s %2f > %2f, Reporter: %s"
msgstr ""
#: service/singleton/servicesentinel.go:475
#, c-format
msgid "[Latency] %s %2f < %2f, Reporter: %s"
msgstr ""
#: service/singleton/servicesentinel.go:501
#, c-format
msgid "[%s] %s Reporter: %s, Error: %s"
msgstr ""
#: service/singleton/servicesentinel.go:544
#, c-format
msgid "[TLS] Fetch cert info failed, Reporter: %s, Error: %s"
msgstr ""
#: service/singleton/servicesentinel.go:584
#, c-format
msgid "The TLS certificate will expire within seven days. Expiration time: %s"
msgstr ""
#: service/singleton/servicesentinel.go:597
#, c-format
msgid ""
"TLS certificate changed, old: issuer %s, expires at %s; new: issuer %s, "
"expires at %s"
msgstr ""
#: service/singleton/servicesentinel.go:633
msgid "No Data"
msgstr ""
#: service/singleton/servicesentinel.go:635
msgid "Good"
msgstr ""
#: service/singleton/servicesentinel.go:637
msgid "Low Availability"
msgstr ""
#: service/singleton/servicesentinel.go:639
msgid "Down"
msgstr ""
#: service/singleton/user.go:60
msgid "user id not specified"
msgstr ""
Binary file not shown.
@@ -0,0 +1,313 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
# This file is distributed under the same license as the PACKAGE package.
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-01-30 21:58+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: Automatically generated\n"
"Language-Team: none\n"
"Language: uk\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=3; plural=n%10==1 && n%100!=11 ? 0 : n%10>=2 && "
"n%10<=4 && (n%100<10 || n%100>=20) ? 1 : 2;\n"
#: cmd/dashboard/controller/alertrule.go:104
#, c-format
msgid "alert id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:108
#: cmd/dashboard/controller/alertrule.go:156
#: cmd/dashboard/controller/alertrule.go:176
#: cmd/dashboard/controller/controller.go:226
#: cmd/dashboard/controller/cron.go:58 cmd/dashboard/controller/cron.go:124
#: cmd/dashboard/controller/cron.go:136 cmd/dashboard/controller/cron.go:195
#: cmd/dashboard/controller/cron.go:224 cmd/dashboard/controller/ddns.go:131
#: cmd/dashboard/controller/ddns.go:192 cmd/dashboard/controller/fm.go:43
#: cmd/dashboard/controller/nat.go:59 cmd/dashboard/controller/nat.go:111
#: cmd/dashboard/controller/nat.go:122 cmd/dashboard/controller/nat.go:162
#: cmd/dashboard/controller/notification.go:112
#: cmd/dashboard/controller/notification.go:166
#: cmd/dashboard/controller/notification_group.go:76
#: cmd/dashboard/controller/notification_group.go:152
#: cmd/dashboard/controller/notification_group.go:164
#: cmd/dashboard/controller/notification_group.go:233
#: cmd/dashboard/controller/server.go:66 cmd/dashboard/controller/server.go:78
#: cmd/dashboard/controller/server.go:137
#: cmd/dashboard/controller/server.go:201
#: cmd/dashboard/controller/server_group.go:75
#: cmd/dashboard/controller/server_group.go:150
#: cmd/dashboard/controller/server_group.go:229
#: cmd/dashboard/controller/service.go:271
#: cmd/dashboard/controller/service.go:342
#: cmd/dashboard/controller/service.go:369
#: cmd/dashboard/controller/terminal.go:41
msgid "permission denied"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:184
msgid "duration need to be at least 3"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:188
msgid "cycle_interval need to be at least 1"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:191
msgid "cycle_start is not set"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:194
msgid "cycle_start is a future value"
msgstr ""
#: cmd/dashboard/controller/alertrule.go:199
msgid "need to configure at least a single rule"
msgstr ""
#: cmd/dashboard/controller/controller.go:220
#: cmd/dashboard/controller/oauth2.go:153
#: cmd/dashboard/controller/server_group.go:162
#: cmd/dashboard/controller/service.go:97 cmd/dashboard/controller/user.go:27
#: cmd/dashboard/controller/user.go:63
msgid "unauthorized"
msgstr ""
#: cmd/dashboard/controller/controller.go:243
msgid "database error"
msgstr ""
#: cmd/dashboard/controller/cron.go:75 cmd/dashboard/controller/cron.go:149
msgid "scheduled tasks cannot be triggered by alarms"
msgstr ""
#: cmd/dashboard/controller/cron.go:132 cmd/dashboard/controller/cron.go:190
#, c-format
msgid "task id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/ddns.go:57 cmd/dashboard/controller/ddns.go:122
msgid "the retry count must be an integer between 1 and 10"
msgstr ""
#: cmd/dashboard/controller/ddns.go:81 cmd/dashboard/controller/ddns.go:154
msgid "error parsing %s: %v"
msgstr ""
#: cmd/dashboard/controller/ddns.go:127 cmd/dashboard/controller/nat.go:118
#, c-format
msgid "profile id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/fm.go:39 cmd/dashboard/controller/terminal.go:37
msgid "server not found or not connected"
msgstr ""
#: cmd/dashboard/controller/notification.go:69
#: cmd/dashboard/controller/notification.go:131
msgid "a test message"
msgstr ""
#: cmd/dashboard/controller/notification.go:108
#, c-format
msgid "notification id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/notification_group.go:94
#: cmd/dashboard/controller/notification_group.go:175
msgid "have invalid notification id"
msgstr ""
#: cmd/dashboard/controller/notification_group.go:160
#: cmd/dashboard/controller/server_group.go:158
#, c-format
msgid "group id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:42 cmd/dashboard/controller/oauth2.go:83
msgid "provider is required"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:52 cmd/dashboard/controller/oauth2.go:87
#: cmd/dashboard/controller/oauth2.go:132
msgid "provider not found"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:100
msgid "operation not permitted"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:138
msgid "code is required"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:175
msgid "oauth2 user not binded yet"
msgstr ""
#: cmd/dashboard/controller/oauth2.go:217
#: cmd/dashboard/controller/oauth2.go:223
#: cmd/dashboard/controller/oauth2.go:228
msgid "invalid state key"
msgstr ""
#: cmd/dashboard/controller/server.go:74
#, c-format
msgid "server id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/server.go:250
msgid "operation timeout"
msgstr ""
#: cmd/dashboard/controller/server.go:257
msgid "get server config failed: %v"
msgstr ""
#: cmd/dashboard/controller/server.go:261
msgid "get server config failed"
msgstr ""
#: cmd/dashboard/controller/server_group.go:92
#: cmd/dashboard/controller/server_group.go:172
msgid "have invalid server id"
msgstr ""
#: cmd/dashboard/controller/service.go:90
#: cmd/dashboard/controller/service.go:165
msgid "server not found"
msgstr ""
#: cmd/dashboard/controller/service.go:267
#, c-format
msgid "service id %d does not exist"
msgstr ""
#: cmd/dashboard/controller/user.go:68
msgid "incorrect password"
msgstr ""
#: cmd/dashboard/controller/user.go:82
msgid "you don't have any oauth2 bindings"
msgstr ""
#: cmd/dashboard/controller/user.go:131
msgid "password length must be greater than 6"
msgstr ""
#: cmd/dashboard/controller/user.go:134
msgid "username can't be empty"
msgstr ""
#: cmd/dashboard/controller/user.go:137
msgid "invalid role"
msgstr ""
#: cmd/dashboard/controller/user.go:176
msgid "can't delete yourself"
msgstr ""
#: service/rpc/io_stream.go:128
msgid "timeout: no connection established"
msgstr ""
#: service/rpc/io_stream.go:131
msgid "timeout: user connection not established"
msgstr ""
#: service/rpc/io_stream.go:134
msgid "timeout: agent connection not established"
msgstr ""
#: service/rpc/nezha.go:71
msgid "Scheduled Task Executed Successfully"
msgstr ""
#: service/rpc/nezha.go:75
msgid "Scheduled Task Executed Failed"
msgstr ""
#: service/rpc/nezha.go:274
msgid "IP Changed"
msgstr ""
#: service/singleton/alertsentinel.go:169
msgid "Incident"
msgstr ""
#: service/singleton/alertsentinel.go:179
msgid "Resolved"
msgstr ""
#: service/singleton/crontask.go:54
msgid "Tasks failed to register: ["
msgstr ""
#: service/singleton/crontask.go:61
msgid ""
"] These tasks will not execute properly. Fix them in the admin dashboard."
msgstr ""
#: service/singleton/crontask.go:144 service/singleton/crontask.go:169
#, c-format
msgid "[Task failed] %s: server %s is offline and cannot execute the task"
msgstr ""
#: service/singleton/servicesentinel.go:468
#, c-format
msgid "[Latency] %s %2f > %2f, Reporter: %s"
msgstr ""
#: service/singleton/servicesentinel.go:475
#, c-format
msgid "[Latency] %s %2f < %2f, Reporter: %s"
msgstr ""
#: service/singleton/servicesentinel.go:501
#, c-format
msgid "[%s] %s Reporter: %s, Error: %s"
msgstr ""
#: service/singleton/servicesentinel.go:544
#, c-format
msgid "[TLS] Fetch cert info failed, Reporter: %s, Error: %s"
msgstr ""
#: service/singleton/servicesentinel.go:584
#, c-format
msgid "The TLS certificate will expire within seven days. Expiration time: %s"
msgstr ""
#: service/singleton/servicesentinel.go:597
#, c-format
msgid ""
"TLS certificate changed, old: issuer %s, expires at %s; new: issuer %s, "
"expires at %s"
msgstr ""
#: service/singleton/servicesentinel.go:633
msgid "No Data"
msgstr ""
#: service/singleton/servicesentinel.go:635
msgid "Good"
msgstr ""
#: service/singleton/servicesentinel.go:637
msgid "Low Availability"
msgstr ""
#: service/singleton/servicesentinel.go:639
msgid "Down"
msgstr ""
#: service/singleton/user.go:60
msgid "user id not specified"
msgstr ""
+67
View File
@@ -0,0 +1,67 @@
package tsdb
import "time"
// Config TSDB 配置选项
type Config struct {
// DataPath 数据存储路径,为空则不启用 TSDB
DataPath string `koanf:"data_path" json:"data_path,omitempty"`
// RetentionDays 数据保留天数,默认 30 天
RetentionDays uint16 `koanf:"retention_days" json:"retention_days,omitempty"`
// MinFreeDiskSpaceGB 最小磁盘剩余空间(GB),默认 1GB
// 当磁盘剩余空间低于此值时,TSDB 将停止接收新数据以防止磁盘耗尽
MinFreeDiskSpaceGB float64 `koanf:"min_free_disk_space_gb" json:"min_free_disk_space_gb,omitempty"`
// MaxMemoryMB 最大内存使用量(MB),默认 256MB,用于限制 VictoriaMetrics 缓存
MaxMemoryMB int64 `koanf:"max_memory_mb" json:"max_memory_mb,omitempty"`
// DedupInterval 去重间隔,默认 30 秒
DedupInterval time.Duration `koanf:"dedup_interval" json:"dedup_interval,omitempty"`
// WriteBufferSize 写入缓冲区大小,默认 512,达到此数量后批量写入
WriteBufferSize int `koanf:"write_buffer_size" json:"write_buffer_size,omitempty"`
// WriteBufferFlushInterval 写入缓冲区刷新间隔,默认 5 秒
WriteBufferFlushInterval time.Duration `koanf:"write_buffer_flush_interval" json:"write_buffer_flush_interval,omitempty"`
}
// DefaultConfig 返回默认配置(不设置 DataPath,需要显式配置才启用)
func DefaultConfig() *Config {
return &Config{
DataPath: "",
RetentionDays: 30,
MinFreeDiskSpaceGB: 1,
MaxMemoryMB: 256,
DedupInterval: 30 * time.Second,
WriteBufferSize: 512,
WriteBufferFlushInterval: 5 * time.Second,
}
}
// Validate 验证配置有效性并填充默认值
func (c *Config) Validate() {
if c.RetentionDays == 0 {
c.RetentionDays = 30
}
if c.MinFreeDiskSpaceGB <= 0 {
c.MinFreeDiskSpaceGB = 1
}
if c.MaxMemoryMB <= 0 {
c.MaxMemoryMB = 256
}
if c.DedupInterval <= 0 {
c.DedupInterval = 30 * time.Second
}
if c.WriteBufferSize <= 0 {
c.WriteBufferSize = 512
}
if c.WriteBufferFlushInterval <= 0 {
c.WriteBufferFlushInterval = 5 * time.Second
}
}
// Enabled 检查是否启用 TSDB
func (c *Config) Enabled() bool {
return c.DataPath != ""
}
// MinFreeDiskSpaceBytes 返回最小磁盘剩余空间(字节)
func (c *Config) MinFreeDiskSpaceBytes() int64 {
return int64(c.MinFreeDiskSpaceGB * 1024 * 1024 * 1024)
}
+17
View File
@@ -0,0 +1,17 @@
package tsdb
import (
"log"
)
func (db *TSDB) Maintenance() {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return
}
log.Println("NEZHA>> TSDB starting maintenance (flush)...")
db.storage.DebugFlush()
log.Println("NEZHA>> TSDB maintenance completed")
}
+664
View File
@@ -0,0 +1,664 @@
package tsdb
import (
"fmt"
"log"
"sort"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/nezhahq/nezha/model"
)
// QueryPeriod 查询时间段
type QueryPeriod string
const (
Period1Day QueryPeriod = "1d"
Period7Days QueryPeriod = "7d"
Period30Days QueryPeriod = "30d"
)
// ParseQueryPeriod 解析查询时间段
func ParseQueryPeriod(s string) (QueryPeriod, error) {
switch s {
case "1d", "":
return Period1Day, nil
case "7d":
return Period7Days, nil
case "30d":
return Period30Days, nil
default:
return "", fmt.Errorf("invalid period: %s, expected 1d, 7d, or 30d", s)
}
}
// Duration 返回时间段的时长
func (p QueryPeriod) Duration() time.Duration {
switch p {
case Period7Days:
return 7 * 24 * time.Hour
case Period30Days:
return 30 * 24 * time.Hour
default:
return 24 * time.Hour
}
}
// DownsampleInterval 返回降采样间隔
// 1d: 30秒一个点 (2880个点)
// 7d: 30分钟一个点 (336个点)
// 30d: 2小时一个点 (360个点)
func (p QueryPeriod) DownsampleInterval() time.Duration {
switch p {
case Period7Days:
return 30 * time.Minute
case Period30Days:
return 2 * time.Hour
default:
return 30 * time.Second
}
}
// Type aliases for model types used in tsdb package
type (
DataPoint = model.DataPoint
ServiceHistorySummary = model.ServiceHistorySummary
ServerServiceStats = model.ServerServiceStats
ServiceHistoryResult = model.ServiceHistoryResponse
MetricDataPoint = model.ServerMetricsDataPoint
)
type rawDataPoint struct {
timestamp int64
value float64
status float64
hasDelay bool
hasStatus bool
}
func (db *TSDB) QueryServiceHistory(serviceID uint64, period QueryPeriod) (*ServiceHistoryResult, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return nil, fmt.Errorf("TSDB is closed")
}
now := time.Now()
tr := storage.TimeRange{
MinTimestamp: now.Add(-period.Duration()).UnixMilli(),
MaxTimestamp: now.UnixMilli(),
}
serviceIDStr := strconv.FormatUint(serviceID, 10)
delayData, err := db.queryMetricByServiceID(MetricServiceDelay, serviceIDStr, tr)
if err != nil {
return nil, fmt.Errorf("failed to query delay data: %w", err)
}
statusData, err := db.queryMetricByServiceID(MetricServiceStatus, serviceIDStr, tr)
if err != nil {
return nil, fmt.Errorf("failed to query status data: %w", err)
}
result := &ServiceHistoryResult{
ServiceID: serviceID,
Servers: make([]ServerServiceStats, 0),
}
serverDataMap := make(map[uint64]map[int64]*rawDataPoint)
for serverID, points := range delayData {
if serverDataMap[serverID] == nil {
serverDataMap[serverID] = make(map[int64]*rawDataPoint)
}
for _, p := range points {
serverDataMap[serverID][p.timestamp] = &rawDataPoint{
timestamp: p.timestamp,
value: p.value,
hasDelay: true,
}
}
}
for serverID, points := range statusData {
if serverDataMap[serverID] == nil {
serverDataMap[serverID] = make(map[int64]*rawDataPoint)
}
for _, p := range points {
if existing, ok := serverDataMap[serverID][p.timestamp]; ok {
existing.status = p.value
existing.hasStatus = true
} else {
serverDataMap[serverID][p.timestamp] = &rawDataPoint{
timestamp: p.timestamp,
status: p.value,
hasStatus: true,
}
}
}
}
for serverID, pointsMap := range serverDataMap {
points := make([]rawDataPoint, 0, len(pointsMap))
for _, p := range pointsMap {
points = append(points, *p)
}
stats := calculateStats(points, period.DownsampleInterval())
result.Servers = append(result.Servers, ServerServiceStats{
ServerID: serverID,
Stats: stats,
})
}
sort.Slice(result.Servers, func(i, j int) bool {
return result.Servers[i].ServerID < result.Servers[j].ServerID
})
return result, nil
}
type DailyServiceStats struct {
Up uint64
Down uint64
Delay float64
}
func (db *TSDB) QueryServiceDailyStats(serviceID uint64, today time.Time, days int) ([]DailyServiceStats, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return nil, fmt.Errorf("TSDB is closed")
}
stats := make([]DailyServiceStats, days)
serviceIDStr := strconv.FormatUint(serviceID, 10)
start := today.AddDate(0, 0, -(days - 1))
tr := storage.TimeRange{
MinTimestamp: start.UnixMilli(),
MaxTimestamp: today.UnixMilli(),
}
statusData, err := db.queryMetricByServiceID(MetricServiceStatus, serviceIDStr, tr)
if err != nil {
return nil, err
}
delayData, err := db.queryMetricByServiceID(MetricServiceDelay, serviceIDStr, tr)
if err != nil {
return nil, err
}
for _, points := range statusData {
for _, p := range points {
ts := time.UnixMilli(p.timestamp)
dayIndex := (days - 1) - int(today.Sub(ts).Hours())/24
if dayIndex < 0 || dayIndex >= days {
continue
}
if p.value >= 0.5 {
stats[dayIndex].Up++
} else {
stats[dayIndex].Down++
}
}
}
delayCount := make([]int, days)
for _, points := range delayData {
for _, p := range points {
ts := time.UnixMilli(p.timestamp)
dayIndex := (days - 1) - int(today.Sub(ts).Hours())/24
if dayIndex < 0 || dayIndex >= days {
continue
}
stats[dayIndex].Delay = (stats[dayIndex].Delay*float64(delayCount[dayIndex]) + p.value) / float64(delayCount[dayIndex]+1)
delayCount[dayIndex]++
}
}
return stats, nil
}
type metricPoint struct {
timestamp int64
value float64
}
func (db *TSDB) queryMetricByServiceID(metric MetricType, serviceID string, tr storage.TimeRange) (map[uint64][]metricPoint, error) {
tfs := storage.NewTagFilters()
if err := tfs.Add(nil, []byte(metric), false, false); err != nil {
return nil, err
}
if err := tfs.Add([]byte("service_id"), []byte(serviceID), false, false); err != nil {
return nil, err
}
deadline := uint64(time.Now().Add(30 * time.Second).Unix())
var search storage.Search
search.Init(nil, db.storage, []*storage.TagFilters{tfs}, tr, 100000, deadline)
defer search.MustClose()
result := make(map[uint64][]metricPoint)
var timestamps []int64
var values []float64
for search.NextMetricBlock() {
mbr := search.MetricBlockRef
var block storage.Block
mbr.BlockRef.MustReadBlock(&block)
mn := storage.GetMetricName()
if err := mn.Unmarshal(mbr.MetricName); err != nil {
log.Printf("NEZHA>> TSDB: failed to unmarshal metric name: %v", err)
storage.PutMetricName(mn)
continue
}
serverIDBytes := mn.GetTagValue("server_id")
if len(serverIDBytes) == 0 {
storage.PutMetricName(mn)
continue
}
serverID, err := strconv.ParseUint(string(serverIDBytes), 10, 64)
if err != nil {
log.Printf("NEZHA>> TSDB: failed to parse server_id %q: %v", string(serverIDBytes), err)
storage.PutMetricName(mn)
continue
}
storage.PutMetricName(mn)
if err := block.UnmarshalData(); err != nil {
log.Printf("NEZHA>> TSDB: failed to unmarshal block data: %v", err)
continue
}
timestamps = timestamps[:0]
values = values[:0]
timestamps, values = block.AppendRowsWithTimeRangeFilter(timestamps, values, tr)
for i := range timestamps {
result[serverID] = append(result[serverID], metricPoint{
timestamp: timestamps[i],
value: values[i],
})
}
}
if err := search.Error(); err != nil {
return nil, err
}
return result, nil
}
func calculateStats(points []rawDataPoint, downsampleInterval time.Duration) ServiceHistorySummary {
if len(points) == 0 {
return ServiceHistorySummary{}
}
sort.Slice(points, func(i, j int) bool {
return points[i].timestamp < points[j].timestamp
})
var totalDelay float64
var delayCount int
var totalUp, totalDown uint64
for _, p := range points {
if p.hasDelay {
totalDelay += p.value
delayCount++
}
if p.hasStatus {
if p.status >= 0.5 {
totalUp++
} else {
totalDown++
}
}
}
summary := ServiceHistorySummary{
TotalUp: totalUp,
TotalDown: totalDown,
}
if delayCount > 0 {
summary.AvgDelay = totalDelay / float64(delayCount)
}
if totalUp+totalDown > 0 {
summary.UpPercent = float32(totalUp) / float32(totalUp+totalDown) * 100
}
summary.DataPoints = downsample(points, downsampleInterval)
return summary
}
func downsample(points []rawDataPoint, interval time.Duration) []DataPoint {
if len(points) == 0 {
return nil
}
intervalMs := interval.Milliseconds()
result := make([]DataPoint, 0)
// points 已排序,线性扫描分桶
bucketStart := (points[0].timestamp / intervalMs) * intervalMs
var totalDelay float64
var delayCount, upCount, statusCount int
flushBucket := func() {
var avgDelay float64
if delayCount > 0 {
avgDelay = totalDelay / float64(delayCount)
}
var status uint8
if statusCount > 0 && upCount > statusCount/2 {
status = 1
}
result = append(result, DataPoint{
Timestamp: bucketStart,
Delay: avgDelay,
Status: status,
})
}
for _, p := range points {
key := (p.timestamp / intervalMs) * intervalMs
if key != bucketStart {
flushBucket()
bucketStart = key
totalDelay = 0
delayCount = 0
upCount = 0
statusCount = 0
}
if p.hasDelay {
totalDelay += p.value
delayCount++
}
if p.hasStatus {
statusCount++
if p.status >= 0.5 {
upCount++
}
}
}
flushBucket()
return result
}
func downsampleMetrics(points []rawDataPoint, interval time.Duration, useLastValue bool) []MetricDataPoint {
if len(points) == 0 {
return nil
}
sort.Slice(points, func(i, j int) bool {
return points[i].timestamp < points[j].timestamp
})
intervalMs := interval.Milliseconds()
result := make([]MetricDataPoint, 0)
bucketStart := (points[0].timestamp / intervalMs) * intervalMs
var total float64
var count int
var last rawDataPoint
flushBucket := func() {
var value float64
if useLastValue {
value = last.value
} else if count > 0 {
value = total / float64(count)
}
result = append(result, MetricDataPoint{
Timestamp: bucketStart,
Value: value,
})
}
for _, p := range points {
key := (p.timestamp / intervalMs) * intervalMs
if key != bucketStart {
flushBucket()
bucketStart = key
total = 0
count = 0
}
total += p.value
count++
last = p
}
flushBucket()
return result
}
// isCumulativeMetric 判断指标是否为累积型(单调递增)
func isCumulativeMetric(metric MetricType) bool {
switch metric {
case MetricServerNetInTransfer, MetricServerNetOutTransfer, MetricServerUptime:
return true
default:
return false
}
}
func (db *TSDB) QueryServerMetrics(serverID uint64, metric MetricType, period QueryPeriod) ([]MetricDataPoint, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return nil, fmt.Errorf("TSDB is closed")
}
now := time.Now()
tr := storage.TimeRange{
MinTimestamp: now.Add(-period.Duration()).UnixMilli(),
MaxTimestamp: now.UnixMilli(),
}
serverIDStr := strconv.FormatUint(serverID, 10)
tfs := storage.NewTagFilters()
if err := tfs.Add(nil, []byte(metric), false, false); err != nil {
return nil, err
}
if err := tfs.Add([]byte("server_id"), []byte(serverIDStr), false, false); err != nil {
return nil, err
}
deadline := uint64(time.Now().Add(30 * time.Second).Unix())
var search storage.Search
search.Init(nil, db.storage, []*storage.TagFilters{tfs}, tr, 100000, deadline)
defer search.MustClose()
var points []rawDataPoint
var timestamps []int64
var values []float64
for search.NextMetricBlock() {
mbr := search.MetricBlockRef
var block storage.Block
mbr.BlockRef.MustReadBlock(&block)
if err := block.UnmarshalData(); err != nil {
log.Printf("NEZHA>> TSDB: failed to unmarshal block data: %v", err)
continue
}
timestamps = timestamps[:0]
values = values[:0]
timestamps, values = block.AppendRowsWithTimeRangeFilter(timestamps, values, tr)
for i := range timestamps {
points = append(points, rawDataPoint{
timestamp: timestamps[i],
value: values[i],
})
}
}
if err := search.Error(); err != nil {
return nil, err
}
return downsampleMetrics(points, period.DownsampleInterval(), isCumulativeMetric(metric)), nil
}
func (db *TSDB) QueryServiceHistoryByServerID(serverID uint64, period QueryPeriod) (map[uint64]*ServiceHistoryResult, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return nil, fmt.Errorf("TSDB is closed")
}
now := time.Now()
tr := storage.TimeRange{
MinTimestamp: now.Add(-period.Duration()).UnixMilli(),
MaxTimestamp: now.UnixMilli(),
}
serverIDStr := strconv.FormatUint(serverID, 10)
delayData, err := db.queryMetricByServerID(MetricServiceDelay, serverIDStr, tr)
if err != nil {
return nil, fmt.Errorf("failed to query delay data: %w", err)
}
statusData, err := db.queryMetricByServerID(MetricServiceStatus, serverIDStr, tr)
if err != nil {
return nil, fmt.Errorf("failed to query status data: %w", err)
}
serviceDataMap := make(map[uint64]map[int64]*rawDataPoint)
for serviceID, points := range delayData {
if serviceDataMap[serviceID] == nil {
serviceDataMap[serviceID] = make(map[int64]*rawDataPoint)
}
for _, p := range points {
serviceDataMap[serviceID][p.timestamp] = &rawDataPoint{
timestamp: p.timestamp,
value: p.value,
hasDelay: true,
}
}
}
for serviceID, points := range statusData {
if serviceDataMap[serviceID] == nil {
serviceDataMap[serviceID] = make(map[int64]*rawDataPoint)
}
for _, p := range points {
if existing, ok := serviceDataMap[serviceID][p.timestamp]; ok {
existing.status = p.value
existing.hasStatus = true
} else {
serviceDataMap[serviceID][p.timestamp] = &rawDataPoint{
timestamp: p.timestamp,
status: p.value,
hasStatus: true,
}
}
}
}
results := make(map[uint64]*ServiceHistoryResult)
for serviceID, pointsMap := range serviceDataMap {
points := make([]rawDataPoint, 0, len(pointsMap))
for _, p := range pointsMap {
points = append(points, *p)
}
stats := calculateStats(points, period.DownsampleInterval())
results[serviceID] = &ServiceHistoryResult{
ServiceID: serviceID,
Servers: []ServerServiceStats{{
ServerID: serverID,
Stats: stats,
}},
}
}
return results, nil
}
func (db *TSDB) queryMetricByServerID(metric MetricType, serverID string, tr storage.TimeRange) (map[uint64][]metricPoint, error) {
tfs := storage.NewTagFilters()
if err := tfs.Add(nil, []byte(metric), false, false); err != nil {
return nil, err
}
if err := tfs.Add([]byte("server_id"), []byte(serverID), false, false); err != nil {
return nil, err
}
deadline := uint64(time.Now().Add(30 * time.Second).Unix())
var search storage.Search
search.Init(nil, db.storage, []*storage.TagFilters{tfs}, tr, 100000, deadline)
defer search.MustClose()
result := make(map[uint64][]metricPoint)
var timestamps []int64
var values []float64
for search.NextMetricBlock() {
mbr := search.MetricBlockRef
var block storage.Block
mbr.BlockRef.MustReadBlock(&block)
mn := storage.GetMetricName()
if err := mn.Unmarshal(mbr.MetricName); err != nil {
log.Printf("NEZHA>> TSDB: failed to unmarshal metric name: %v", err)
storage.PutMetricName(mn)
continue
}
serviceIDBytes := mn.GetTagValue("service_id")
if len(serviceIDBytes) == 0 {
storage.PutMetricName(mn)
continue
}
serviceID, err := strconv.ParseUint(string(serviceIDBytes), 10, 64)
if err != nil {
log.Printf("NEZHA>> TSDB: failed to parse service_id %q: %v", string(serviceIDBytes), err)
storage.PutMetricName(mn)
continue
}
storage.PutMetricName(mn)
if err := block.UnmarshalData(); err != nil {
log.Printf("NEZHA>> TSDB: failed to unmarshal block data: %v", err)
continue
}
timestamps = timestamps[:0]
values = values[:0]
timestamps, values = block.AppendRowsWithTimeRangeFilter(timestamps, values, tr)
for i := range timestamps {
result[serviceID] = append(result[serviceID], metricPoint{
timestamp: timestamps[i],
value: values[i],
})
}
}
if err := search.Error(); err != nil {
return nil, err
}
return result, nil
}
+117
View File
@@ -0,0 +1,117 @@
package tsdb
import (
"fmt"
"log"
"path/filepath"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
// TSDB 封装 VictoriaMetrics 存储
type TSDB struct {
storage *storage.Storage
config *Config
mu sync.RWMutex
closed bool
writer *bufferedWriter
}
// InitGlobalSettings 初始化 VictoriaMetrics 包级别的全局设置。
// 这些设置是进程级别的,应在 Open() 之前调用且只调用一次。
func InitGlobalSettings(config *Config) {
memBytes := int(config.MaxMemoryMB * 1024 * 1024)
storage.SetTSIDCacheSize(memBytes * 35 / 100)
storage.SetMetricNameCacheSize(memBytes * 10 / 100)
storage.SetTagFiltersCacheSize(memBytes * 5 / 100)
storage.SetMetadataStorageSize(memBytes * 1 / 100)
storage.SetDedupInterval(config.DedupInterval)
storage.SetFreeDiskSpaceLimit(config.MinFreeDiskSpaceBytes())
storage.SetDataFlushInterval(5 * time.Second)
}
// Open 打开或创建 TSDB 存储
func Open(config *Config) (*TSDB, error) {
if config == nil {
config = DefaultConfig()
}
config.Validate()
dataPath := config.DataPath
if !filepath.IsAbs(dataPath) {
absPath, err := filepath.Abs(dataPath)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path: %w", err)
}
dataPath = absPath
}
InitGlobalSettings(config)
opts := storage.OpenOptions{
Retention: time.Duration(config.RetentionDays) * 24 * time.Hour,
}
stor := storage.MustOpenStorage(dataPath, opts)
db := &TSDB{
storage: stor,
config: config,
}
db.writer = newBufferedWriter(db, config.WriteBufferSize, config.WriteBufferFlushInterval)
log.Printf("NEZHA>> TSDB opened at %s, retention: %d days, min free disk: %.1f GB, max memory: %d MB",
dataPath, config.RetentionDays, config.MinFreeDiskSpaceGB, config.MaxMemoryMB)
return db, nil
}
// Close 关闭 TSDB 存储
func (db *TSDB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return nil
}
if db.writer != nil {
db.writer.stop()
}
db.storage.MustClose()
db.closed = true
log.Println("NEZHA>> TSDB closed")
return nil
}
// Storage 返回底层存储对象(用于高级查询)
func (db *TSDB) Storage() *storage.Storage {
return db.storage
}
// Config 返回配置
func (db *TSDB) Config() *Config {
return db.config
}
// IsClosed 检查是否已关闭
func (db *TSDB) IsClosed() bool {
db.mu.RLock()
defer db.mu.RUnlock()
return db.closed
}
// Flush 强制刷盘(主要用于测试)
func (db *TSDB) Flush() {
if db.writer != nil {
db.writer.flush()
}
db.storage.DebugFlush()
}
+622
View File
@@ -0,0 +1,622 @@
package tsdb
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConfig_Defaults(t *testing.T) {
config := DefaultConfig()
assert.Equal(t, "", config.DataPath) // 默认为空,不启用 TSDB
assert.Equal(t, uint16(30), config.RetentionDays)
assert.Equal(t, float64(1), config.MinFreeDiskSpaceGB)
assert.Equal(t, 30*time.Second, config.DedupInterval)
assert.False(t, config.Enabled())
}
func TestConfig_Enabled(t *testing.T) {
config := &Config{DataPath: ""}
assert.False(t, config.Enabled())
config.DataPath = "data/tsdb"
assert.True(t, config.Enabled())
}
func TestConfig_MinFreeDiskSpaceBytes(t *testing.T) {
config := &Config{MinFreeDiskSpaceGB: 5}
expected := int64(5 * 1024 * 1024 * 1024)
assert.Equal(t, expected, config.MinFreeDiskSpaceBytes())
}
func TestTSDB_OpenClose(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
require.NotNil(t, db)
assert.False(t, db.IsClosed())
assert.NotNil(t, db.Storage())
assert.Equal(t, config, db.Config())
err = db.Close()
require.NoError(t, err)
assert.True(t, db.IsClosed())
// 重复关闭应该安全
err = db.Close()
require.NoError(t, err)
}
func TestTSDB_WriteServerMetrics(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
metrics := &ServerMetrics{
ServerID: 1,
Timestamp: time.Now(),
CPU: 50.5,
MemUsed: 1024 * 1024 * 1024,
SwapUsed: 512 * 1024 * 1024,
DiskUsed: 10 * 1024 * 1024 * 1024,
NetInSpeed: 1000000,
NetOutSpeed: 500000,
NetInTransfer: 1000000000,
NetOutTransfer: 500000000,
Load1: 1.5,
Load5: 1.2,
Load15: 1.0,
TCPConnCount: 100,
UDPConnCount: 50,
ProcessCount: 200,
Temperature: 65.5,
Uptime: 86400,
GPU: 30.0,
}
err = db.WriteServerMetrics(metrics)
require.NoError(t, err)
// 强制刷盘
db.Flush()
}
func TestTSDB_WriteServiceMetrics(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
metrics := &ServiceMetrics{
ServiceID: 1,
ServerID: 1,
Timestamp: time.Now(),
Delay: 45.5,
Successful: true,
}
err = db.WriteServiceMetrics(metrics)
require.NoError(t, err)
// 测试失败状态
metrics2 := &ServiceMetrics{
ServiceID: 1,
ServerID: 2,
Timestamp: time.Now(),
Delay: 0,
Successful: false,
}
err = db.WriteServiceMetrics(metrics2)
require.NoError(t, err)
// 强制刷盘
db.Flush()
}
func TestTSDB_WriteBatchMetrics(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
// 批量写入服务器指标
serverMetrics := []*ServerMetrics{
{ServerID: 1, Timestamp: time.Now(), CPU: 10.0},
{ServerID: 2, Timestamp: time.Now(), CPU: 20.0},
{ServerID: 3, Timestamp: time.Now(), CPU: 30.0},
}
err = db.WriteBatchServerMetrics(serverMetrics)
require.NoError(t, err)
// 批量写入服务指标
serviceMetrics := []*ServiceMetrics{
{ServiceID: 1, ServerID: 1, Timestamp: time.Now(), Delay: 10.0, Successful: true},
{ServiceID: 1, ServerID: 2, Timestamp: time.Now(), Delay: 20.0, Successful: true},
{ServiceID: 2, ServerID: 1, Timestamp: time.Now(), Delay: 15.0, Successful: false},
}
err = db.WriteBatchServiceMetrics(serviceMetrics)
require.NoError(t, err)
db.Flush()
}
func TestTSDB_WriteToClosedDB(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
db.Close()
// 写入已关闭的数据库应该返回错误
err = db.WriteServerMetrics(&ServerMetrics{ServerID: 1, Timestamp: time.Now()})
assert.Error(t, err)
err = db.WriteServiceMetrics(&ServiceMetrics{ServiceID: 1, ServerID: 1, Timestamp: time.Now()})
assert.Error(t, err)
}
func TestQueryPeriod_Parse(t *testing.T) {
tests := []struct {
input string
expected QueryPeriod
hasError bool
}{
{"1d", Period1Day, false},
{"7d", Period7Days, false},
{"30d", Period30Days, false},
{"", Period1Day, false},
{"invalid", "", true},
{"1w", "", true},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
period, err := ParseQueryPeriod(tt.input)
if tt.hasError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expected, period)
}
})
}
}
func TestQueryPeriod_Duration(t *testing.T) {
assert.Equal(t, 24*time.Hour, Period1Day.Duration())
assert.Equal(t, 7*24*time.Hour, Period7Days.Duration())
assert.Equal(t, 30*24*time.Hour, Period30Days.Duration())
}
func TestQueryPeriod_DownsampleInterval(t *testing.T) {
assert.Equal(t, 30*time.Second, Period1Day.DownsampleInterval())
assert.Equal(t, 30*time.Minute, Period7Days.DownsampleInterval())
assert.Equal(t, 2*time.Hour, Period30Days.DownsampleInterval())
}
func TestTSDB_QueryServiceHistory(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
// 写入测试数据
now := time.Now()
serviceID := uint64(100)
serverID1 := uint64(1)
serverID2 := uint64(2)
// 写入多条服务监控数据
for i := 0; i < 10; i++ {
ts := now.Add(-time.Duration(i) * time.Minute)
// 服务器1的数据:成功
err := db.WriteServiceMetrics(&ServiceMetrics{
ServiceID: serviceID,
ServerID: serverID1,
Timestamp: ts,
Delay: float64(10 + i),
Successful: true,
})
require.NoError(t, err)
// 服务器2的数据:部分失败
err = db.WriteServiceMetrics(&ServiceMetrics{
ServiceID: serviceID,
ServerID: serverID2,
Timestamp: ts,
Delay: float64(20 + i),
Successful: i%2 == 0, // 偶数成功,奇数失败
})
require.NoError(t, err)
}
// 强制刷盘确保数据可见
db.Flush()
// 查询服务历史
result, err := db.QueryServiceHistory(serviceID, Period1Day)
require.NoError(t, err)
require.NotNil(t, result)
assert.Equal(t, serviceID, result.ServiceID)
require.Len(t, result.Servers, 2, "expected 2 servers")
// 验证服务器统计
for _, server := range result.Servers {
if server.ServerID == serverID1 {
// 服务器1全部成功
assert.Equal(t, uint64(10), server.Stats.TotalUp)
assert.Equal(t, uint64(0), server.Stats.TotalDown)
assert.Equal(t, float32(100), server.Stats.UpPercent)
} else if server.ServerID == serverID2 {
// 服务器2一半成功
assert.Equal(t, uint64(5), server.Stats.TotalUp)
assert.Equal(t, uint64(5), server.Stats.TotalDown)
assert.Equal(t, float32(50), server.Stats.UpPercent)
}
}
}
func TestTSDB_QueryServerMetrics(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
// 写入测试数据
now := time.Now()
serverID := uint64(1)
for i := 0; i < 10; i++ {
ts := now.Add(-time.Duration(i) * time.Minute)
err := db.WriteServerMetrics(&ServerMetrics{
ServerID: serverID,
Timestamp: ts,
CPU: float64(10 + i*5),
})
require.NoError(t, err)
}
// 强制刷盘确保数据可见
db.Flush()
// 查询服务器指标
result, err := db.QueryServerMetrics(serverID, MetricServerCPU, Period1Day)
require.NoError(t, err)
require.NotEmpty(t, result, "expected data points")
}
func TestTSDB_QueryEmptyResult(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
// 查询不存在的服务历史
result, err := db.QueryServiceHistory(9999, Period1Day)
require.NoError(t, err)
require.NotNil(t, result)
assert.Empty(t, result.Servers)
// 查询不存在的服务器指标
serverResult, err := db.QueryServerMetrics(9999, MetricServerCPU, Period1Day)
require.NoError(t, err)
assert.Empty(t, serverResult)
}
func TestTSDB_QueryClosedDB(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
db.Close()
// 查询已关闭的数据库应该返回错误
_, err = db.QueryServiceHistory(1, Period1Day)
assert.Error(t, err)
_, err = db.QueryServerMetrics(1, MetricServerCPU, Period1Day)
assert.Error(t, err)
}
func TestDownsample(t *testing.T) {
points := []rawDataPoint{
{timestamp: 0, value: 10, status: 1, hasDelay: true, hasStatus: true},
{timestamp: 1000, value: 20, status: 1, hasDelay: true, hasStatus: true},
{timestamp: 2000, value: 30, status: 0, hasDelay: true, hasStatus: true},
{timestamp: 3000, value: 40, status: 1, hasDelay: true, hasStatus: true},
{timestamp: 4000, value: 50, status: 1, hasDelay: true, hasStatus: true},
}
result := downsample(points, 2*time.Second)
assert.Len(t, result, 3)
for i := 1; i < len(result); i++ {
assert.Greater(t, result[i].Timestamp, result[i-1].Timestamp)
}
}
func TestCalculateStats(t *testing.T) {
points := []rawDataPoint{
{timestamp: 1000, value: 10, status: 1, hasDelay: true, hasStatus: true},
{timestamp: 2000, value: 20, status: 1, hasDelay: true, hasStatus: true},
{timestamp: 3000, value: 30, status: 0, hasDelay: true, hasStatus: true},
{timestamp: 4000, value: 40, status: 1, hasDelay: true, hasStatus: true},
}
stats := calculateStats(points, 5*time.Minute)
assert.Equal(t, uint64(3), stats.TotalUp)
assert.Equal(t, uint64(1), stats.TotalDown)
assert.Equal(t, float32(75), stats.UpPercent)
assert.Equal(t, float64(25), stats.AvgDelay)
}
func TestCalculateStats_ZeroDelay(t *testing.T) {
points := []rawDataPoint{
{timestamp: 1000, value: 0, status: 1, hasDelay: true, hasStatus: true},
{timestamp: 2000, value: 10, status: 1, hasDelay: true, hasStatus: true},
}
stats := calculateStats(points, 5*time.Minute)
assert.Equal(t, float64(5), stats.AvgDelay)
assert.Equal(t, uint64(2), stats.TotalUp)
}
func TestCalculateStatsEmpty(t *testing.T) {
points := []rawDataPoint{}
stats := calculateStats(points, 5*time.Minute)
assert.Equal(t, uint64(0), stats.TotalUp)
assert.Equal(t, uint64(0), stats.TotalDown)
assert.Equal(t, float32(0), stats.UpPercent)
assert.Equal(t, float64(0), stats.AvgDelay)
assert.Nil(t, stats.DataPoints)
}
func TestTSDB_QueryServerMetrics_Float64Precision(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
now := time.Now()
serverID := uint64(1)
largeMemValue := uint64(17_179_869_184) // 16GB
err = db.WriteServerMetrics(&ServerMetrics{
ServerID: serverID,
Timestamp: now,
MemUsed: largeMemValue,
})
require.NoError(t, err)
db.Flush()
result, err := db.QueryServerMetrics(serverID, MetricServerMemory, Period1Day)
require.NoError(t, err)
require.NotEmpty(t, result)
// float64 可以精确表示该值,float32 会丢失精度
assert.Equal(t, float64(largeMemValue), result[0].Value)
}
func TestTSDB_QueryServiceHistoryByServerID(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
now := time.Now()
serverID := uint64(1)
serviceID1 := uint64(100)
serviceID2 := uint64(200)
// 写入两个服务在同一服务器上的数据
for i := 0; i < 5; i++ {
ts := now.Add(-time.Duration(i) * time.Minute)
err := db.WriteServiceMetrics(&ServiceMetrics{
ServiceID: serviceID1,
ServerID: serverID,
Timestamp: ts,
Delay: float64(10 + i),
Successful: true,
})
require.NoError(t, err)
err = db.WriteServiceMetrics(&ServiceMetrics{
ServiceID: serviceID2,
ServerID: serverID,
Timestamp: ts,
Delay: float64(20 + i),
Successful: i%2 == 0,
})
require.NoError(t, err)
}
db.Flush()
results, err := db.QueryServiceHistoryByServerID(serverID, Period1Day)
require.NoError(t, err)
require.Len(t, results, 2, "expected 2 services")
// 验证 service1:全部成功
s1, ok := results[serviceID1]
require.True(t, ok)
assert.Equal(t, serviceID1, s1.ServiceID)
require.Len(t, s1.Servers, 1)
assert.Equal(t, serverID, s1.Servers[0].ServerID)
assert.Equal(t, uint64(5), s1.Servers[0].Stats.TotalUp)
assert.Equal(t, uint64(0), s1.Servers[0].Stats.TotalDown)
// 验证 service2:部分成功
s2, ok := results[serviceID2]
require.True(t, ok)
assert.Equal(t, serviceID2, s2.ServiceID)
assert.Equal(t, uint64(3), s2.Servers[0].Stats.TotalUp)
assert.Equal(t, uint64(2), s2.Servers[0].Stats.TotalDown)
}
func TestTSDB_QueryServiceHistoryByServerID_Empty(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
defer db.Close()
results, err := db.QueryServiceHistoryByServerID(9999, Period1Day)
require.NoError(t, err)
assert.Empty(t, results)
}
func TestTSDB_QueryServiceHistoryByServerID_ClosedDB(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tsdb_test")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
config := &Config{
DataPath: filepath.Join(tempDir, "tsdb"),
RetentionDays: 1,
MinFreeDiskSpaceGB: 1,
DedupInterval: time.Second,
}
db, err := Open(config)
require.NoError(t, err)
db.Close()
_, err = db.QueryServiceHistoryByServerID(1, Period1Day)
assert.Error(t, err)
}
+301
View File
@@ -0,0 +1,301 @@
package tsdb
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
type bufferedWriter struct {
db *TSDB
buffer []storage.MetricRow
mu sync.Mutex
maxSize int
flushTicker *time.Ticker
stopCh chan struct{}
wg sync.WaitGroup
}
func newBufferedWriter(db *TSDB, maxSize int, flushInterval time.Duration) *bufferedWriter {
w := &bufferedWriter{
db: db,
buffer: make([]storage.MetricRow, 0, maxSize),
maxSize: maxSize,
flushTicker: time.NewTicker(flushInterval),
stopCh: make(chan struct{}),
}
w.wg.Add(1)
go w.flushLoop()
return w
}
func (w *bufferedWriter) flushLoop() {
defer w.wg.Done()
for {
select {
case <-w.flushTicker.C:
w.flush()
case <-w.stopCh:
w.flush()
return
}
}
}
func (w *bufferedWriter) write(rows []storage.MetricRow) {
w.mu.Lock()
w.buffer = append(w.buffer, rows...)
if len(w.buffer) >= w.maxSize {
rows := w.buffer
w.buffer = make([]storage.MetricRow, 0, w.maxSize)
w.mu.Unlock()
w.db.storage.AddRows(rows, 64)
return
}
w.mu.Unlock()
}
func (w *bufferedWriter) flush() {
w.mu.Lock()
if len(w.buffer) == 0 {
w.mu.Unlock()
return
}
rows := w.buffer
w.buffer = make([]storage.MetricRow, 0, w.maxSize)
w.mu.Unlock()
w.db.storage.AddRows(rows, 64)
}
func (w *bufferedWriter) stop() {
w.flushTicker.Stop()
close(w.stopCh)
w.wg.Wait()
}
// MetricType 指标类型
type MetricType string
const (
// 服务器指标
MetricServerCPU MetricType = "nezha_server_cpu"
MetricServerMemory MetricType = "nezha_server_memory"
MetricServerSwap MetricType = "nezha_server_swap"
MetricServerDisk MetricType = "nezha_server_disk"
MetricServerNetInSpeed MetricType = "nezha_server_net_in_speed"
MetricServerNetOutSpeed MetricType = "nezha_server_net_out_speed"
MetricServerNetInTransfer MetricType = "nezha_server_net_in_transfer"
MetricServerNetOutTransfer MetricType = "nezha_server_net_out_transfer"
MetricServerLoad1 MetricType = "nezha_server_load1"
MetricServerLoad5 MetricType = "nezha_server_load5"
MetricServerLoad15 MetricType = "nezha_server_load15"
MetricServerTCPConn MetricType = "nezha_server_tcp_conn"
MetricServerUDPConn MetricType = "nezha_server_udp_conn"
MetricServerProcessCount MetricType = "nezha_server_process_count"
MetricServerTemperature MetricType = "nezha_server_temperature"
MetricServerUptime MetricType = "nezha_server_uptime"
MetricServerGPU MetricType = "nezha_server_gpu"
// 服务监控指标
MetricServiceDelay MetricType = "nezha_service_delay"
MetricServiceStatus MetricType = "nezha_service_status"
)
// ServerMetrics 服务器指标数据
type ServerMetrics struct {
ServerID uint64
Timestamp time.Time
CPU float64
MemUsed uint64
SwapUsed uint64
DiskUsed uint64
NetInSpeed uint64
NetOutSpeed uint64
NetInTransfer uint64
NetOutTransfer uint64
Load1 float64
Load5 float64
Load15 float64
TCPConnCount uint64
UDPConnCount uint64
ProcessCount uint64
Temperature float64
Uptime uint64
GPU float64
}
// ServiceMetrics 服务监控指标数据
type ServiceMetrics struct {
ServiceID uint64
ServerID uint64
Timestamp time.Time
Delay float64
Successful bool
}
func (db *TSDB) WriteServerMetrics(m *ServerMetrics) error {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return fmt.Errorf("TSDB is closed")
}
ts := m.Timestamp.UnixMilli()
serverIDStr := strconv.FormatUint(m.ServerID, 10)
rows := []storage.MetricRow{
makeServerMetricRow(MetricServerCPU, serverIDStr, ts, m.CPU),
makeServerMetricRow(MetricServerMemory, serverIDStr, ts, float64(m.MemUsed)),
makeServerMetricRow(MetricServerSwap, serverIDStr, ts, float64(m.SwapUsed)),
makeServerMetricRow(MetricServerDisk, serverIDStr, ts, float64(m.DiskUsed)),
makeServerMetricRow(MetricServerNetInSpeed, serverIDStr, ts, float64(m.NetInSpeed)),
makeServerMetricRow(MetricServerNetOutSpeed, serverIDStr, ts, float64(m.NetOutSpeed)),
makeServerMetricRow(MetricServerNetInTransfer, serverIDStr, ts, float64(m.NetInTransfer)),
makeServerMetricRow(MetricServerNetOutTransfer, serverIDStr, ts, float64(m.NetOutTransfer)),
makeServerMetricRow(MetricServerLoad1, serverIDStr, ts, m.Load1),
makeServerMetricRow(MetricServerLoad5, serverIDStr, ts, m.Load5),
makeServerMetricRow(MetricServerLoad15, serverIDStr, ts, m.Load15),
makeServerMetricRow(MetricServerTCPConn, serverIDStr, ts, float64(m.TCPConnCount)),
makeServerMetricRow(MetricServerUDPConn, serverIDStr, ts, float64(m.UDPConnCount)),
makeServerMetricRow(MetricServerProcessCount, serverIDStr, ts, float64(m.ProcessCount)),
makeServerMetricRow(MetricServerTemperature, serverIDStr, ts, m.Temperature),
makeServerMetricRow(MetricServerUptime, serverIDStr, ts, float64(m.Uptime)),
makeServerMetricRow(MetricServerGPU, serverIDStr, ts, m.GPU),
}
if db.writer != nil {
db.writer.write(rows)
} else {
db.storage.AddRows(rows, 64)
}
return nil
}
func (db *TSDB) WriteServiceMetrics(m *ServiceMetrics) error {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return fmt.Errorf("TSDB is closed")
}
ts := m.Timestamp.UnixMilli()
serviceIDStr := strconv.FormatUint(m.ServiceID, 10)
serverIDStr := strconv.FormatUint(m.ServerID, 10)
var status float64
if m.Successful {
status = 1
}
rows := []storage.MetricRow{
makeServiceMetricRow(MetricServiceDelay, serviceIDStr, serverIDStr, ts, m.Delay),
makeServiceMetricRow(MetricServiceStatus, serviceIDStr, serverIDStr, ts, status),
}
if db.writer != nil {
db.writer.write(rows)
} else {
db.storage.AddRows(rows, 64)
}
return nil
}
func makeServerMetricRow(metric MetricType, serverID string, timestamp int64, value float64) storage.MetricRow {
labels := []prompb.Label{
{Name: "__name__", Value: string(metric)},
{Name: "server_id", Value: serverID},
}
return storage.MetricRow{
MetricNameRaw: storage.MarshalMetricNameRaw(nil, labels),
Timestamp: timestamp,
Value: value,
}
}
func makeServiceMetricRow(metric MetricType, serviceID, serverID string, timestamp int64, value float64) storage.MetricRow {
labels := []prompb.Label{
{Name: "__name__", Value: string(metric)},
{Name: "service_id", Value: serviceID},
{Name: "server_id", Value: serverID},
}
return storage.MetricRow{
MetricNameRaw: storage.MarshalMetricNameRaw(nil, labels),
Timestamp: timestamp,
Value: value,
}
}
func (db *TSDB) WriteBatchServerMetrics(metrics []*ServerMetrics) error {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return fmt.Errorf("TSDB is closed")
}
rows := make([]storage.MetricRow, 0, len(metrics)*17)
for _, m := range metrics {
ts := m.Timestamp.UnixMilli()
serverIDStr := strconv.FormatUint(m.ServerID, 10)
rows = append(rows,
makeServerMetricRow(MetricServerCPU, serverIDStr, ts, m.CPU),
makeServerMetricRow(MetricServerMemory, serverIDStr, ts, float64(m.MemUsed)),
makeServerMetricRow(MetricServerSwap, serverIDStr, ts, float64(m.SwapUsed)),
makeServerMetricRow(MetricServerDisk, serverIDStr, ts, float64(m.DiskUsed)),
makeServerMetricRow(MetricServerNetInSpeed, serverIDStr, ts, float64(m.NetInSpeed)),
makeServerMetricRow(MetricServerNetOutSpeed, serverIDStr, ts, float64(m.NetOutSpeed)),
makeServerMetricRow(MetricServerNetInTransfer, serverIDStr, ts, float64(m.NetInTransfer)),
makeServerMetricRow(MetricServerNetOutTransfer, serverIDStr, ts, float64(m.NetOutTransfer)),
makeServerMetricRow(MetricServerLoad1, serverIDStr, ts, m.Load1),
makeServerMetricRow(MetricServerLoad5, serverIDStr, ts, m.Load5),
makeServerMetricRow(MetricServerLoad15, serverIDStr, ts, m.Load15),
makeServerMetricRow(MetricServerTCPConn, serverIDStr, ts, float64(m.TCPConnCount)),
makeServerMetricRow(MetricServerUDPConn, serverIDStr, ts, float64(m.UDPConnCount)),
makeServerMetricRow(MetricServerProcessCount, serverIDStr, ts, float64(m.ProcessCount)),
makeServerMetricRow(MetricServerTemperature, serverIDStr, ts, m.Temperature),
makeServerMetricRow(MetricServerUptime, serverIDStr, ts, float64(m.Uptime)),
makeServerMetricRow(MetricServerGPU, serverIDStr, ts, m.GPU),
)
}
if db.writer != nil {
db.writer.write(rows)
} else {
db.storage.AddRows(rows, 64)
}
return nil
}
func (db *TSDB) WriteBatchServiceMetrics(metrics []*ServiceMetrics) error {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return fmt.Errorf("TSDB is closed")
}
rows := make([]storage.MetricRow, 0, len(metrics)*2)
for _, m := range metrics {
ts := m.Timestamp.UnixMilli()
serviceIDStr := strconv.FormatUint(m.ServiceID, 10)
serverIDStr := strconv.FormatUint(m.ServerID, 10)
var status float64
if m.Successful {
status = 1
}
rows = append(rows,
makeServiceMetricRow(MetricServiceDelay, serviceIDStr, serverIDStr, ts, m.Delay),
makeServiceMetricRow(MetricServiceStatus, serviceIDStr, serverIDStr, ts, status),
)
}
if db.writer != nil {
db.writer.write(rows)
} else {
db.storage.AddRows(rows, 64)
}
return nil
}
+40
View File
@@ -0,0 +1,40 @@
package utils
import (
"fmt"
"math"
)
// https://github.com/dustin/go-humanize/blob/master/bytes.go
func logn(n, b float64) float64 {
return math.Log(n) / math.Log(b)
}
func countDigits(n int64) int {
digits := 0
for n != 0 {
n /= 10
digits += 1
}
return digits
}
func humanateBytes(s uint64, base float64, minDigits int, sizes []string) string {
if s < 10 {
return fmt.Sprintf("%d B", s)
}
e := math.Floor(logn(float64(s), base))
suffix := sizes[min(len(sizes)-1, int(e))] // #nosec G602
rounding := math.Pow10(minDigits - 1)
val := math.Floor(float64(s)/math.Pow(base, e)*rounding+0.5) / rounding
ff := "%%.%df %%s"
digits := max(minDigits-countDigits(int64(val)), 0)
f := fmt.Sprintf(ff, digits)
return fmt.Sprintf(f, val, suffix)
}
func Bytes(s uint64) string {
sizes := []string{"B", "kB", "MB", "GB", "TB", "PB", "EB"}
return humanateBytes(s, 1024, 2, sizes)
}