mirror of
https://github.com/Buriburizaem0n/nezha_domains.git
synced 2026-05-06 05:38:50 +00:00
e61772e858
* feat: tsdb * fix(ci): remove --parseGoList=false from swag init to fix dependency resolution * fix(ci): fix swag init directory and temporary remove s390x support due to cgo issues * fix(ci): fix swag init output directory to cmd/dashboard/docs * fix(ci): set GOTOOLCHAIN=auto for gosec * feat: add system storage maintenance for SQLite and TSDB * shit * feat: add s390x support and improve service monitoring * ci: upgrade goreleaser-cross image to v1.25 * ci: add libzstd-dev:s390x for cross-compilation * ci: build libzstd for s390x from source * ci: add libzstd_linux_s390x.go for gozstd linking * ci: use vendor mode for s390x gozstd build * ci: clone zstd source for s390x build * refactor(tsdb): rename MaxDiskUsageGB to MinFreeDiskSpaceGB and optimize queries - Rename config to accurately reflect VictoriaMetrics behavior: minimum free disk space threshold - Add QueryServiceHistoryByServerID for batch query optimization - Fix hasStatus to avoid false status counting when only delay data exists - Fix service aggregation boundary: use successCount*2 >= count - Fix serviceID parsing with strconv.ParseUint error handling - Add TagFiltersCacheSize for better query performance * feat(api): add server metrics endpoint and simplify service history response - Add /server/:id/metrics API for querying TSDB server metrics - Simplify getServiceHistory by removing redundant data conversion - Change AvgDelay type from float32 to float64 - Remove generated swagger docs (to be regenerated) - Update TSDB query, writer and tests * chore: 临时禁用不支持前端 * ci: cache zstd build for s390x to speed up CI * fix(tsdb): fix race conditions, data correctness and optimize performance - Fix TOCTOU race between IsClosed() and write/query by holding RLock - Fix delay=0 excluded from stats by using hasDelay flag instead of value > 0 - Fix fmt.Sscanf -> strconv.ParseUint for server_id parsing with error logging - Fix buffer unbounded growth by flushing inside lock when over maxSize - Split makeMetricRow into makeServerMetricRow/makeServiceMetricRow - Extract InitGlobalSettings() from Open() for VictoriaMetrics globals - Remove redundant instance/GetInstance/SetInstance singleton - Add error logging for silently skipped block decode errors - Optimize WriteBatch* to build all rows in single write call - Optimize downsample to use linear scan instead of map for sorted data - Optimize query slice reuse across block iterations * 服务添加DisplayIndex (#1166) * 服务添加DisplayIndex * 根据ai建议修改 --------- Co-authored-by: huYang <306061454@qq.com> * fix(tsdb): restore SQLite fallback and monthly status reload on restart - Restore ServiceHistory model and SQLite write fallback when TSDB is disabled - Reload monthlyStatus (30-day) and serviceStatusToday from TSDB/SQLite on startup - Add SQLite fallback query for /service/:id/history and /server/:id/service - Remove breaking GET /service/:id endpoint, keep /service/:id/history only - Add QueryServiceDailyStats to TSDB for per-day aggregation - Add tests for monthly status and today stats loading from both TSDB and SQLite - Migrate ServiceHistory table only when TSDB is disabled * ci: exclude false-positive gosec rules G117, G703, G704 * feat(api): expose tsdb_enabled in setting response * ci: restore G115 exclusion accidentally dropped in previous commit * fix: update version numbers for OfficialAdmin and Official templates * chore: upgrade frontend * chore: upgrade frontend --------- Co-authored-by: 胡说丷刂 <34758853+laosan-xx@users.noreply.github.com> Co-authored-by: huYang <306061454@qq.com>
321 lines
9.1 KiB
Go
321 lines
9.1 KiB
Go
package rpc
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jinzhu/copier"
|
|
geoipx "github.com/nezhahq/nezha/pkg/geoip"
|
|
"github.com/nezhahq/nezha/pkg/grpcx"
|
|
"github.com/nezhahq/nezha/pkg/tsdb"
|
|
|
|
"github.com/nezhahq/nezha/model"
|
|
pb "github.com/nezhahq/nezha/proto"
|
|
"github.com/nezhahq/nezha/service/singleton"
|
|
)
|
|
|
|
var _ pb.NezhaServiceServer = (*NezhaHandler)(nil)
|
|
|
|
var NezhaHandlerSingleton *NezhaHandler
|
|
|
|
type NezhaHandler struct {
|
|
Auth *authHandler
|
|
ioStreams map[string]*ioStreamContext
|
|
ioStreamMutex *sync.RWMutex
|
|
}
|
|
|
|
func NewNezhaHandler() *NezhaHandler {
|
|
return &NezhaHandler{
|
|
Auth: &authHandler{},
|
|
ioStreamMutex: new(sync.RWMutex),
|
|
ioStreams: make(map[string]*ioStreamContext),
|
|
}
|
|
}
|
|
|
|
func (s *NezhaHandler) RequestTask(stream pb.NezhaService_RequestTaskServer) error {
|
|
var clientID uint64
|
|
var err error
|
|
if clientID, err = s.Auth.Check(stream.Context()); err != nil {
|
|
return err
|
|
}
|
|
|
|
server, _ := singleton.ServerShared.Get(clientID)
|
|
server.TaskStream = stream
|
|
var result *pb.TaskResult
|
|
for {
|
|
result, err = stream.Recv()
|
|
if err != nil {
|
|
log.Printf("NEZHA>> RequestTask error: %v, clientID: %d\n", err, clientID)
|
|
return err
|
|
}
|
|
switch result.GetType() {
|
|
case model.TaskTypeCommand:
|
|
// 处理上报的计划任务
|
|
cr, _ := singleton.CronShared.Get(result.GetId())
|
|
if cr != nil {
|
|
// 保存当前服务器状态信息
|
|
var curServer model.Server
|
|
copier.Copy(&curServer, server)
|
|
if cr.PushSuccessful && result.GetSuccessful() {
|
|
singleton.NotificationShared.SendNotification(cr.NotificationGroupID, fmt.Sprintf("[%s] %s, %s\n%s", singleton.Localizer.T("Scheduled Task Executed Successfully"),
|
|
cr.Name, server.Name, result.GetData()), "", &curServer)
|
|
}
|
|
if !result.GetSuccessful() {
|
|
singleton.NotificationShared.SendNotification(cr.NotificationGroupID, fmt.Sprintf("[%s] %s, %s\n%s", singleton.Localizer.T("Scheduled Task Executed Failed"),
|
|
cr.Name, server.Name, result.GetData()), "", &curServer)
|
|
}
|
|
singleton.DB.Model(cr).Updates(model.Cron{
|
|
LastExecutedAt: time.Now().Add(time.Second * -1 * time.Duration(result.GetDelay())),
|
|
LastResult: result.GetSuccessful(),
|
|
})
|
|
}
|
|
case model.TaskTypeReportConfig:
|
|
if len(server.ConfigCache) < 1 {
|
|
if !result.GetSuccessful() {
|
|
server.ConfigCache <- errors.New(result.Data)
|
|
continue
|
|
}
|
|
server.ConfigCache <- result.Data
|
|
}
|
|
default:
|
|
if model.IsServiceSentinelNeeded(result.GetType()) {
|
|
singleton.ServiceSentinelShared.Dispatch(singleton.ReportData{
|
|
Data: result,
|
|
Reporter: clientID,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *NezhaHandler) ReportSystemState(stream pb.NezhaService_ReportSystemStateServer) error {
|
|
clientID, err := s.Auth.Check(stream.Context())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var state *pb.State
|
|
for {
|
|
state, err = stream.Recv()
|
|
if err != nil {
|
|
log.Printf("NEZHA>> ReportSystemState error: %v, clientID: %d\n", err, clientID)
|
|
return err
|
|
}
|
|
innerState := model.PB2State(state)
|
|
|
|
server, ok := singleton.ServerShared.Get(clientID)
|
|
if !ok || server == nil {
|
|
return errors.New("server not found")
|
|
}
|
|
|
|
server.LastActive = time.Now()
|
|
server.State = &innerState
|
|
|
|
if singleton.TSDBEnabled() {
|
|
maxTemp := 0.0
|
|
for _, t := range innerState.Temperatures {
|
|
if t.Temperature > maxTemp {
|
|
maxTemp = t.Temperature
|
|
}
|
|
}
|
|
maxGPU := 0.0
|
|
for _, g := range innerState.GPU {
|
|
if g > maxGPU {
|
|
maxGPU = g
|
|
}
|
|
}
|
|
if err := singleton.TSDBShared.WriteServerMetrics(&tsdb.ServerMetrics{
|
|
ServerID: clientID,
|
|
Timestamp: time.Now(),
|
|
CPU: innerState.CPU,
|
|
MemUsed: innerState.MemUsed,
|
|
SwapUsed: innerState.SwapUsed,
|
|
DiskUsed: innerState.DiskUsed,
|
|
NetInSpeed: innerState.NetInSpeed,
|
|
NetOutSpeed: innerState.NetOutSpeed,
|
|
NetInTransfer: innerState.NetInTransfer,
|
|
NetOutTransfer: innerState.NetOutTransfer,
|
|
Load1: innerState.Load1,
|
|
Load5: innerState.Load5,
|
|
Load15: innerState.Load15,
|
|
TCPConnCount: innerState.TcpConnCount,
|
|
UDPConnCount: innerState.UdpConnCount,
|
|
ProcessCount: innerState.ProcessCount,
|
|
Temperature: maxTemp,
|
|
Uptime: innerState.Uptime,
|
|
GPU: maxGPU,
|
|
}); err != nil {
|
|
log.Printf("NEZHA>> Failed to write server metrics to TSDB: %v", err)
|
|
}
|
|
}
|
|
|
|
// 应对 dashboard / agent 重启的情况,如果从未记录过,先打点,等到小时时间点时入库
|
|
if server.PrevTransferInSnapshot == 0 || server.PrevTransferOutSnapshot == 0 {
|
|
server.PrevTransferInSnapshot = state.NetInTransfer
|
|
server.PrevTransferOutSnapshot = state.NetOutTransfer
|
|
}
|
|
|
|
if err = stream.Send(&pb.Receipt{Proced: true}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *NezhaHandler) onReportSystemInfo(c context.Context, r *pb.Host) error {
|
|
var clientID uint64
|
|
var err error
|
|
if clientID, err = s.Auth.Check(c); err != nil {
|
|
return err
|
|
}
|
|
host := model.PB2Host(r)
|
|
|
|
server, ok := singleton.ServerShared.Get(clientID)
|
|
if !ok || server == nil {
|
|
return errors.New("server not found")
|
|
}
|
|
|
|
/**
|
|
* 这里的 singleton 中的数据都是关机前的旧数据
|
|
* 当 agent 重启时,bootTime 变大,agent 端会先上报 host 信息,然后上报 state 信息
|
|
* 这时可以借助上报顺序的空档,立即记录停机前的数据并重置 Prev* 数据,并由接下来的 state 方法重新赋值
|
|
*/
|
|
if !server.LastActive.IsZero() && host.BootTime > server.Host.BootTime {
|
|
singleton.RecordTransferHourlyUsage(server)
|
|
server.PrevTransferInSnapshot = 0
|
|
server.PrevTransferOutSnapshot = 0
|
|
}
|
|
|
|
server.Host = &host
|
|
return nil
|
|
}
|
|
|
|
func (s *NezhaHandler) ReportSystemInfo(c context.Context, r *pb.Host) (*pb.Receipt, error) {
|
|
if err := s.onReportSystemInfo(c, r); err != nil {
|
|
return nil, err
|
|
}
|
|
return &pb.Receipt{Proced: true}, nil
|
|
}
|
|
|
|
func (s *NezhaHandler) ReportSystemInfo2(c context.Context, r *pb.Host) (*pb.Uint64Receipt, error) {
|
|
if err := s.onReportSystemInfo(c, r); err != nil {
|
|
return nil, err
|
|
}
|
|
return &pb.Uint64Receipt{Data: singleton.DashboardBootTime}, nil
|
|
}
|
|
|
|
func (s *NezhaHandler) IOStream(stream pb.NezhaService_IOStreamServer) error {
|
|
if _, err := s.Auth.Check(stream.Context()); err != nil {
|
|
return err
|
|
}
|
|
id, err := stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// ff05ff05 是 Nezha 的魔数,用于标识流 ID
|
|
if id == nil || len(id.Data) < 4 || (id.Data[0] != 0xff && id.Data[1] != 0x05 && id.Data[2] != 0xff && id.Data[3] == 0x05) {
|
|
return fmt.Errorf("invalid stream id")
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
if err := stream.Send(&pb.IOStreamData{Data: []byte{}}); err != nil {
|
|
log.Printf("NEZHA>> IOStream keepAlive error: %v\n", err)
|
|
return
|
|
}
|
|
time.Sleep(time.Second * 30)
|
|
}
|
|
}()
|
|
|
|
streamId := string(id.Data[4:])
|
|
|
|
if _, err := s.GetStream(streamId); err != nil {
|
|
return err
|
|
}
|
|
iw := grpcx.NewIOStreamWrapper(stream)
|
|
if err := s.AgentConnected(streamId, iw); err != nil {
|
|
return err
|
|
}
|
|
iw.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (s *NezhaHandler) ReportGeoIP(c context.Context, r *pb.GeoIP) (*pb.GeoIP, error) {
|
|
var clientID uint64
|
|
var err error
|
|
if clientID, err = s.Auth.Check(c); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
geoip := model.PB2GeoIP(r)
|
|
use6 := r.GetUse6()
|
|
|
|
if geoip.IP.IPv4Addr == "" && geoip.IP.IPv6Addr == "" {
|
|
ip, _ := c.Value(model.CtxKeyRealIP{}).(string)
|
|
if ip == "" {
|
|
ip, _ = c.Value(model.CtxKeyConnectingIP{}).(string)
|
|
}
|
|
geoip.IP.IPv4Addr = ip
|
|
}
|
|
|
|
joinedIP := geoip.IP.Join()
|
|
|
|
server, ok := singleton.ServerShared.Get(clientID)
|
|
if !ok || server == nil {
|
|
return nil, fmt.Errorf("server not found")
|
|
}
|
|
|
|
// 检查并更新DDNS
|
|
if server.EnableDDNS && joinedIP != "" &&
|
|
(server.GeoIP == nil || server.GeoIP.IP != geoip.IP) {
|
|
ipv4 := geoip.IP.IPv4Addr
|
|
ipv6 := geoip.IP.IPv6Addr
|
|
|
|
if err := singleton.ServerShared.UpdateDDNS(server, &model.IP{IPv4Addr: ipv4, IPv6Addr: ipv6}); err != nil {
|
|
log.Printf("NEZHA>> Failed to update DDNS for server %d: %v", err, server.ID)
|
|
}
|
|
}
|
|
|
|
// 发送IP变动通知
|
|
if server.GeoIP != nil && singleton.Conf.EnableIPChangeNotification &&
|
|
((singleton.Conf.Cover == model.ConfigCoverAll && !singleton.Conf.IgnoredIPNotificationServerIDs[clientID]) ||
|
|
(singleton.Conf.Cover == model.ConfigCoverIgnoreAll && singleton.Conf.IgnoredIPNotificationServerIDs[clientID])) &&
|
|
server.GeoIP.IP.Join() != "" &&
|
|
joinedIP != "" &&
|
|
server.GeoIP.IP != geoip.IP {
|
|
|
|
singleton.NotificationShared.SendNotification(singleton.Conf.IPChangeNotificationGroupID,
|
|
fmt.Sprintf(
|
|
"[%s] %s, %s => %s",
|
|
singleton.Localizer.T("IP Changed"),
|
|
server.Name, singleton.IPDesensitize(server.GeoIP.IP.Join()),
|
|
singleton.IPDesensitize(joinedIP),
|
|
),
|
|
"")
|
|
}
|
|
|
|
// 根据内置数据库查询 IP 地理位置
|
|
var ip string
|
|
if geoip.IP.IPv6Addr != "" && (use6 || geoip.IP.IPv4Addr == "") {
|
|
ip = geoip.IP.IPv6Addr
|
|
} else {
|
|
ip = geoip.IP.IPv4Addr
|
|
}
|
|
|
|
netIP := net.ParseIP(ip)
|
|
location, err := geoipx.Lookup(netIP)
|
|
if err != nil {
|
|
log.Printf("NEZHA>> geoip.Lookup: %v", err)
|
|
}
|
|
geoip.CountryCode = location
|
|
|
|
// 将地区码写入到 Host
|
|
server.GeoIP = &geoip
|
|
|
|
return &pb.GeoIP{Ip: nil, CountryCode: location, DashboardBootTime: singleton.DashboardBootTime}, nil
|
|
}
|