mirror of
https://github.com/Buriburizaem0n/nezha_domains.git
synced 2026-02-05 05:00:05 +00:00
refactor: simplify server & service manipulation (#993)
* refactor: simplify server & service manipulation * update * fix * update for nat, ddns & notification * chore * update cron * update dependencies * use of function iterators * update default dns servers
This commit is contained in:
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/jinzhu/copier"
|
||||
|
||||
@@ -16,36 +15,32 @@ import (
|
||||
pb "github.com/nezhahq/nezha/proto"
|
||||
)
|
||||
|
||||
var (
|
||||
Cron *cron.Cron
|
||||
Crons map[uint64]*model.Cron // [CronID] -> *model.Cron
|
||||
CronLock sync.RWMutex
|
||||
|
||||
CronList []*model.Cron
|
||||
)
|
||||
|
||||
func InitCronTask() {
|
||||
Cron = cron.New(cron.WithSeconds(), cron.WithLocation(Loc))
|
||||
Crons = make(map[uint64]*model.Cron)
|
||||
type CronClass struct {
|
||||
class[uint64, *model.Cron]
|
||||
*cron.Cron
|
||||
}
|
||||
|
||||
// loadCronTasks 加载计划任务
|
||||
func loadCronTasks() {
|
||||
InitCronTask()
|
||||
DB.Find(&CronList)
|
||||
func NewCronClass() *CronClass {
|
||||
cronx := cron.New(cron.WithSeconds(), cron.WithLocation(Loc))
|
||||
list := make(map[uint64]*model.Cron)
|
||||
|
||||
var sortedList []*model.Cron
|
||||
DB.Find(&sortedList)
|
||||
|
||||
var err error
|
||||
var notificationGroupList []uint64
|
||||
notificationMsgMap := make(map[uint64]*strings.Builder)
|
||||
for _, cron := range CronList {
|
||||
|
||||
for _, cron := range sortedList {
|
||||
// 触发任务类型无需注册
|
||||
if cron.TaskType == model.CronTypeTriggerTask {
|
||||
Crons[cron.ID] = cron
|
||||
list[cron.ID] = cron
|
||||
continue
|
||||
}
|
||||
// 注册计划任务
|
||||
cron.CronJobID, err = Cron.AddFunc(cron.Scheduler, CronTrigger(cron))
|
||||
cron.CronJobID, err = cronx.AddFunc(cron.Scheduler, CronTrigger(cron))
|
||||
if err == nil {
|
||||
Crons[cron.ID] = cron
|
||||
list[cron.ID] = cron
|
||||
} else {
|
||||
// 当前通知组首次出现 将其加入通知组列表并初始化通知组消息缓存
|
||||
if _, ok := notificationMsgMap[cron.NotificationGroupID]; !ok {
|
||||
@@ -56,61 +51,74 @@ func loadCronTasks() {
|
||||
notificationMsgMap[cron.NotificationGroupID].WriteString(fmt.Sprintf("%d,", cron.ID))
|
||||
}
|
||||
}
|
||||
|
||||
// 向注册错误的计划任务所在通知组发送通知
|
||||
for _, gid := range notificationGroupList {
|
||||
notificationMsgMap[gid].WriteString(Localizer.T("] These tasks will not execute properly. Fix them in the admin dashboard."))
|
||||
SendNotification(gid, notificationMsgMap[gid].String(), nil)
|
||||
NotificationShared.SendNotification(gid, notificationMsgMap[gid].String(), nil)
|
||||
}
|
||||
cronx.Start()
|
||||
|
||||
return &CronClass{
|
||||
class: class[uint64, *model.Cron]{
|
||||
list: list,
|
||||
sortedList: sortedList,
|
||||
},
|
||||
Cron: cronx,
|
||||
}
|
||||
Cron.Start()
|
||||
}
|
||||
|
||||
func OnRefreshOrAddCron(c *model.Cron) {
|
||||
CronLock.Lock()
|
||||
defer CronLock.Unlock()
|
||||
crOld := Crons[c.ID]
|
||||
func (c *CronClass) Update(cr *model.Cron) {
|
||||
c.listMu.Lock()
|
||||
crOld := c.list[cr.ID]
|
||||
if crOld != nil && crOld.CronJobID != 0 {
|
||||
Cron.Remove(crOld.CronJobID)
|
||||
c.Cron.Remove(crOld.CronJobID)
|
||||
}
|
||||
|
||||
delete(Crons, c.ID)
|
||||
Crons[c.ID] = c
|
||||
delete(c.list, cr.ID)
|
||||
c.list[cr.ID] = cr
|
||||
c.listMu.Unlock()
|
||||
|
||||
c.sortList()
|
||||
}
|
||||
|
||||
func UpdateCronList() {
|
||||
CronLock.RLock()
|
||||
defer CronLock.RUnlock()
|
||||
func (c *CronClass) Delete(idList []uint64) {
|
||||
c.listMu.Lock()
|
||||
for _, id := range idList {
|
||||
cr := c.list[id]
|
||||
if cr != nil && cr.CronJobID != 0 {
|
||||
c.Cron.Remove(cr.CronJobID)
|
||||
}
|
||||
delete(c.list, id)
|
||||
}
|
||||
c.listMu.Unlock()
|
||||
|
||||
CronList = utils.MapValuesToSlice(Crons)
|
||||
slices.SortFunc(CronList, func(a, b *model.Cron) int {
|
||||
c.sortList()
|
||||
}
|
||||
|
||||
func (c *CronClass) sortList() {
|
||||
c.listMu.RLock()
|
||||
defer c.listMu.RUnlock()
|
||||
|
||||
sortedList := utils.MapValuesToSlice(c.list)
|
||||
slices.SortFunc(sortedList, func(a, b *model.Cron) int {
|
||||
return cmp.Compare(a.ID, b.ID)
|
||||
})
|
||||
|
||||
c.sortedListMu.Lock()
|
||||
defer c.sortedListMu.Unlock()
|
||||
c.sortedList = sortedList
|
||||
}
|
||||
|
||||
func OnDeleteCron(id []uint64) {
|
||||
CronLock.Lock()
|
||||
defer CronLock.Unlock()
|
||||
for _, i := range id {
|
||||
cr := Crons[i]
|
||||
if cr != nil && cr.CronJobID != 0 {
|
||||
Cron.Remove(cr.CronJobID)
|
||||
}
|
||||
delete(Crons, i)
|
||||
}
|
||||
}
|
||||
|
||||
func ManualTrigger(c *model.Cron) {
|
||||
CronTrigger(c)()
|
||||
}
|
||||
|
||||
func SendTriggerTasks(taskIDs []uint64, triggerServer uint64) {
|
||||
CronLock.RLock()
|
||||
func (c *CronClass) SendTriggerTasks(taskIDs []uint64, triggerServer uint64) {
|
||||
c.listMu.RLock()
|
||||
var cronLists []*model.Cron
|
||||
for _, taskID := range taskIDs {
|
||||
if c, ok := Crons[taskID]; ok {
|
||||
if c, ok := c.list[taskID]; ok {
|
||||
cronLists = append(cronLists, c)
|
||||
}
|
||||
}
|
||||
CronLock.RUnlock()
|
||||
c.listMu.RUnlock()
|
||||
|
||||
// 依次调用CronTrigger发送任务
|
||||
for _, c := range cronLists {
|
||||
@@ -118,6 +126,10 @@ func SendTriggerTasks(taskIDs []uint64, triggerServer uint64) {
|
||||
}
|
||||
}
|
||||
|
||||
func ManualTrigger(cr *model.Cron) {
|
||||
CronTrigger(cr)()
|
||||
}
|
||||
|
||||
func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
|
||||
crIgnoreMap := make(map[uint64]bool)
|
||||
for j := 0; j < len(cr.Servers); j++ {
|
||||
@@ -128,9 +140,7 @@ func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
|
||||
if len(triggerServer) == 0 {
|
||||
return
|
||||
}
|
||||
ServerLock.RLock()
|
||||
defer ServerLock.RUnlock()
|
||||
if s, ok := ServerList[triggerServer[0]]; ok {
|
||||
if s, ok := ServerShared.Get(triggerServer[0]); ok {
|
||||
if s.TaskStream != nil {
|
||||
s.TaskStream.Send(&pb.Task{
|
||||
Id: cr.ID,
|
||||
@@ -141,15 +151,13 @@ func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
|
||||
// 保存当前服务器状态信息
|
||||
curServer := model.Server{}
|
||||
copier.Copy(&curServer, s)
|
||||
SendNotification(cr.NotificationGroupID, Localizer.Tf("[Task failed] %s: server %s is offline and cannot execute the task", cr.Name, s.Name), nil, &curServer)
|
||||
NotificationShared.SendNotification(cr.NotificationGroupID, Localizer.Tf("[Task failed] %s: server %s is offline and cannot execute the task", cr.Name, s.Name), nil, &curServer)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
ServerLock.RLock()
|
||||
defer ServerLock.RUnlock()
|
||||
for _, s := range ServerList {
|
||||
for _, s := range ServerShared.Range {
|
||||
if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] {
|
||||
continue
|
||||
}
|
||||
@@ -166,7 +174,7 @@ func CronTrigger(cr *model.Cron, triggerServer ...uint64) func() {
|
||||
// 保存当前服务器状态信息
|
||||
curServer := model.Server{}
|
||||
copier.Copy(&curServer, s)
|
||||
SendNotification(cr.NotificationGroupID, Localizer.Tf("[Task failed] %s: server %s is offline and cannot execute the task", cr.Name, s.Name), nil, &curServer)
|
||||
NotificationShared.SendNotification(cr.NotificationGroupID, Localizer.Tf("[Task failed] %s: server %s is offline and cannot execute the task", cr.Name, s.Name), nil, &curServer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user