improve transfer record logic (#1033)

* improve transfer record logic

* refactor

* modernize loops

* remove unused type conversions

* update dependencies

* script: keep .gitkeep files

* fix

* remove clear
This commit is contained in:
UUBulb
2025-03-19 22:21:21 +08:00
committed by GitHub
parent 5c252f5e43
commit fa36a36beb
14 changed files with 168 additions and 131 deletions

View File

@@ -117,10 +117,10 @@ func (s *NezhaHandler) ReportSystemState(stream pb.NezhaService_ReportSystemStat
server.LastActive = time.Now()
server.State = &innerState
// 应对 dashboard 重启的情况,如果从未记录过,先打点,等到小时时间点时入库
// 应对 dashboard / agent 重启的情况,如果从未记录过,先打点,等到小时时间点时入库
if server.PrevTransferInSnapshot == 0 || server.PrevTransferOutSnapshot == 0 {
server.PrevTransferInSnapshot = int64(state.NetInTransfer)
server.PrevTransferOutSnapshot = int64(state.NetOutTransfer)
server.PrevTransferInSnapshot = state.NetInTransfer
server.PrevTransferOutSnapshot = state.NetOutTransfer
}
if err = stream.Send(&pb.Receipt{Proced: true}); err != nil {
@@ -139,17 +139,18 @@ func (s *NezhaHandler) onReportSystemInfo(c context.Context, r *pb.Host) error {
server, ok := singleton.ServerShared.Get(clientID)
if !ok || server == nil {
return fmt.Errorf("server not found")
return errors.New("server not found")
}
/**
* 这里的 singleton 中的数据都是关机前的旧数据
* 当 agent 重启时bootTime 变大agent 端会先上报 host 信息,然后上报 state 信息
* 这可以借助上报顺序的空档,将停机前的流量统计数据标记下来,加到下一个小时的数据点上
* 这可以借助上报顺序的空档,立即记录停机前的数据并重置 Prev* 数据,并由接下来的 state 方法重新赋值
*/
if server.Host != nil && server.Host.BootTime < host.BootTime {
server.PrevTransferInSnapshot = server.PrevTransferInSnapshot - int64(server.State.NetInTransfer)
server.PrevTransferOutSnapshot = server.PrevTransferOutSnapshot - int64(server.State.NetOutTransfer)
if !server.LastActive.IsZero() && host.BootTime > server.Host.BootTime {
singleton.RecordTransferHourlyUsage(server)
server.PrevTransferInSnapshot = 0
server.PrevTransferOutSnapshot = 0
}
server.Host = &host

View File

@@ -36,19 +36,19 @@ func addCycleTransferStatsInfo(alert *model.AlertRule) {
if !alert.Enabled() {
return
}
for j := 0; j < len(alert.Rules); j++ {
if !alert.Rules[j].IsTransferDurationRule() {
for _, rule := range alert.Rules {
if !rule.IsTransferDurationRule() {
continue
}
if AlertsCycleTransferStatsStore[alert.ID] == nil {
from := alert.Rules[j].GetTransferDurationStart()
to := alert.Rules[j].GetTransferDurationEnd()
from := rule.GetTransferDurationStart()
to := rule.GetTransferDurationEnd()
AlertsCycleTransferStatsStore[alert.ID] = &model.CycleTransferStats{
Name: alert.Name,
From: from,
To: to,
Max: uint64(alert.Rules[j].Max),
Min: uint64(alert.Rules[j].Min),
Max: uint64(rule.Max),
Min: uint64(rule.Min),
ServerName: make(map[uint64]string),
Transfer: make(map[uint64]uint64),
NextUpdate: make(map[uint64]time.Time),

View File

@@ -132,8 +132,8 @@ func ManualTrigger(cr *model.Cron) {
func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
crIgnoreMap := make(map[uint64]bool)
for j := 0; j < len(cr.Servers); j++ {
crIgnoreMap[cr.Servers[j]] = true
for _, server := range cr.Servers {
crIgnoreMap[server] = true
}
return func() {
if cr.Cover == model.CronCoverAlertTrigger {
@@ -151,7 +151,7 @@ func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
// 保存当前服务器状态信息
curServer := model.Server{}
copier.Copy(&curServer, s)
NotificationShared.SendNotification(cr.NotificationGroupID, Localizer.Tf("[Task failed] %s: server %s is offline and cannot execute the task", cr.Name, s.Name), "", &curServer)
go NotificationShared.SendNotification(cr.NotificationGroupID, Localizer.Tf("[Task failed] %s: server %s is offline and cannot execute the task", cr.Name, s.Name), "", &curServer)
}
}
return
@@ -174,7 +174,7 @@ func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
// 保存当前服务器状态信息
curServer := model.Server{}
copier.Copy(&curServer, s)
NotificationShared.SendNotification(cr.NotificationGroupID, Localizer.Tf("[Task failed] %s: server %s is offline and cannot execute the task", cr.Name, s.Name), "", &curServer)
go NotificationShared.SendNotification(cr.NotificationGroupID, Localizer.Tf("[Task failed] %s: server %s is offline and cannot execute the task", cr.Name, s.Name), "", &curServer)
}
}
}

View File

@@ -39,14 +39,15 @@ var (
//go:embed frontend-templates.yaml
var frontendTemplatesYAML []byte
func InitTimezoneAndCache() {
func InitTimezoneAndCache() error {
var err error
Loc, err = time.LoadLocation(Conf.Location)
if err != nil {
panic(err)
return err
}
Cache = cache.New(5*time.Minute, 10*time.Minute)
return nil
}
// LoadSingleton 加载子服务并执行
@@ -61,21 +62,22 @@ func LoadSingleton() {
}
// InitFrontendTemplates 从内置文件中加载FrontendTemplates
func InitFrontendTemplates() {
func InitFrontendTemplates() error {
err := yaml.Unmarshal(frontendTemplatesYAML, &FrontendTemplates)
if err != nil {
panic(err)
return err
}
return nil
}
// InitDBFromPath 从给出的文件路径中加载数据库
func InitDBFromPath(path string) {
func InitDBFromPath(path string) error {
var err error
DB, err = gorm.Open(sqlite.Open(path), &gorm.Config{
CreateBatchSize: 200,
})
if err != nil {
panic(err)
return err
}
if Conf.Debug {
DB = DB.Debug()
@@ -86,32 +88,39 @@ func InitDBFromPath(path string) {
model.NAT{}, model.DDNSProfile{}, model.NotificationGroupNotification{},
model.WAF{}, model.Oauth2Bind{})
if err != nil {
panic(err)
return err
}
return nil
}
// RecordTransferHourlyUsage 对流量记录进行打点
func RecordTransferHourlyUsage() {
ServerShared.listMu.RLock()
defer ServerShared.listMu.RUnlock()
func RecordTransferHourlyUsage(servers ...*model.Server) {
now := time.Now()
nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
var txs []model.Transfer
for id, server := range ServerShared.list {
var slist iter.Seq[*model.Server]
if len(servers) > 0 {
slist = slices.Values(servers)
} else {
slist = utils.Seq2To1(ServerShared.Range)
}
for server := range slist {
tx := model.Transfer{
ServerID: id,
In: utils.Uint64SubInt64(server.State.NetInTransfer, server.PrevTransferInSnapshot),
Out: utils.Uint64SubInt64(server.State.NetOutTransfer, server.PrevTransferOutSnapshot),
ServerID: server.ID,
In: utils.SubUintChecked(server.State.NetInTransfer, server.PrevTransferInSnapshot),
Out: utils.SubUintChecked(server.State.NetOutTransfer, server.PrevTransferOutSnapshot),
}
if tx.In == 0 && tx.Out == 0 {
continue
}
server.PrevTransferInSnapshot = int64(server.State.NetInTransfer)
server.PrevTransferOutSnapshot = int64(server.State.NetOutTransfer)
server.PrevTransferInSnapshot = server.State.NetInTransfer
server.PrevTransferOutSnapshot = server.State.NetOutTransfer
tx.CreatedAt = nowTrimSeconds
txs = append(txs, tx)
}
if len(txs) == 0 {
return
}