test: 增加MinReportTraffic最低流量上报阈值

This commit is contained in:
wyx2685
2025-08-07 14:18:12 +09:00
parent 9be082ede6
commit f7b588fb45
18 changed files with 238 additions and 128 deletions

View File

@@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/InazumaV/V2bX/common/counter"
"github.com/InazumaV/V2bX/common/rate"
"github.com/InazumaV/V2bX/limiter"
@@ -98,12 +99,13 @@ func (r *cachedReader) Interrupt() {
// DefaultDispatcher is a default implementation of Dispatcher.
type DefaultDispatcher struct {
ohm outbound.Manager
router routing.Router
policy policy.Manager
stats stats.Manager
fdns dns.FakeDNSEngine
Wm *WriterManager
ohm outbound.Manager
router routing.Router
policy policy.Manager
stats stats.Manager
fdns dns.FakeDNSEngine
Wm *WriterManager
Counter sync.Map
}
func init() {
@@ -204,24 +206,22 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (*
inboundLink.Writer = rate.NewRateLimitWriter(inboundLink.Writer, w)
outboundLink.Writer = rate.NewRateLimitWriter(outboundLink.Writer, w)
}
p := d.policy.ForLevel(user.Level)
if p.Stats.UserUplink {
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
inboundLink.Writer = &SizeStatWriter{
Counter: c,
Writer: inboundLink.Writer,
}
}
var t *counter.TrafficCounter
if c, ok := d.Counter.Load(sessionInbound.Tag); !ok {
t = counter.NewTrafficCounter()
d.Counter.Store(sessionInbound.Tag, t)
} else {
t = c.(*counter.TrafficCounter)
}
if p.Stats.UserDownlink {
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
outboundLink.Writer = &SizeStatWriter{
Counter: c,
Writer: outboundLink.Writer,
}
}
inboundLink.Writer = &UploadTrafficWriter{
Counter: t.GetCounter(user.Email),
Writer: inboundLink.Writer,
}
outboundLink.Writer = &DownloadTrafficWriter{
Counter: t.GetCounter(user.Email),
Writer: outboundLink.Writer,
}
}

View File

@@ -1,25 +1,43 @@
package dispatcher
import (
"github.com/InazumaV/V2bX/common/counter"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/features/stats"
)
type SizeStatWriter struct {
Counter stats.Counter
type UploadTrafficWriter struct {
Counter *counter.TrafficStorage
Writer buf.Writer
}
func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.Counter.Add(int64(mb.Len()))
type DownloadTrafficWriter struct {
Counter *counter.TrafficStorage
Writer buf.Writer
}
func (w *UploadTrafficWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.Counter.UpCounter.Add(int64(mb.Len()))
return w.Writer.WriteMultiBuffer(mb)
}
func (w *SizeStatWriter) Close() error {
func (w *UploadTrafficWriter) Close() error {
return common.Close(w.Writer)
}
func (w *SizeStatWriter) Interrupt() {
func (w *UploadTrafficWriter) Interrupt() {
common.Interrupt(w.Writer)
}
func (w *DownloadTrafficWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.Counter.DownCounter.Add(int64(mb.Len()))
return w.Writer.WriteMultiBuffer(mb)
}
func (w *DownloadTrafficWriter) Close() error {
return common.Close(w.Writer)
}
func (w *DownloadTrafficWriter) Interrupt() {
common.Interrupt(w.Writer)
}

View File

@@ -17,6 +17,7 @@ type DNSConfig struct {
}
func (c *Xray) AddNode(tag string, info *panel.NodeInfo, config *conf.Options) error {
c.nodeReportMinTrafficBytes[tag] = config.ReportMinTraffic * 1024
err := updateDNSConfig(info)
if err != nil {
return fmt.Errorf("build dns error: %s", err)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/InazumaV/V2bX/api/panel"
"github.com/InazumaV/V2bX/common/counter"
"github.com/InazumaV/V2bX/common/format"
vCore "github.com/InazumaV/V2bX/core"
"github.com/xtls/xray-core/common/protocol"
@@ -32,47 +33,61 @@ func (c *Xray) DelUsers(users []panel.UserInfo, tag string, _ *panel.NodeInfo) e
if err != nil {
return fmt.Errorf("get user manager error: %s", err)
}
var up, down, user string
var user string
c.users.mapLock.Lock()
defer c.users.mapLock.Unlock()
for i := range users {
user = format.UserTag(tag, users[i].Uuid)
err = userManager.RemoveUser(context.Background(), user)
if err != nil {
return err
}
up = "user>>>" + user + ">>>traffic>>>uplink"
down = "user>>>" + user + ">>>traffic>>>downlink"
c.shm.UnregisterCounter(up)
c.shm.UnregisterCounter(down)
delete(c.users.uidMap, user)
c.dispatcher.Counter.Delete(user)
c.dispatcher.Wm.RemoveWritersForUser(user)
}
return nil
}
func (c *Xray) GetUserTraffic(tag, uuid string, reset bool) (up int64, down int64) {
upName := "user>>>" + format.UserTag(tag, uuid) + ">>>traffic>>>uplink"
downName := "user>>>" + format.UserTag(tag, uuid) + ">>>traffic>>>downlink"
upCounter := c.shm.GetCounter(upName)
downCounter := c.shm.GetCounter(downName)
if reset {
if upCounter != nil {
up = upCounter.Set(0)
}
if downCounter != nil {
down = downCounter.Set(0)
}
} else {
if upCounter != nil {
up = upCounter.Value()
}
if downCounter != nil {
down = downCounter.Value()
func (x *Xray) GetUserTrafficSlice(tag string, reset bool) ([]panel.UserTraffic, error) {
trafficSlice := make([]panel.UserTraffic, 0)
x.users.mapLock.RLock()
defer x.users.mapLock.RUnlock()
if v, ok := x.dispatcher.Counter.Load(tag); ok {
c := v.(*counter.TrafficCounter)
c.Counters.Range(func(key, value interface{}) bool {
email := key.(string)
traffic := value.(*counter.TrafficStorage)
up := traffic.UpCounter.Load()
down := traffic.DownCounter.Load()
if up+down >= x.nodeReportMinTrafficBytes[tag] {
if reset {
traffic.UpCounter.Store(0)
traffic.DownCounter.Store(0)
}
trafficSlice = append(trafficSlice, panel.UserTraffic{
UID: x.users.uidMap[email],
Upload: up,
Download: down,
})
}
return true
})
if len(trafficSlice) == 0 {
return nil, nil
}
return trafficSlice, nil
}
return up, down
return nil, nil
}
func (c *Xray) AddUsers(p *vCore.AddUsersParams) (added int, err error) {
users := make([]*protocol.User, 0, len(p.Users))
c.users.mapLock.Lock()
defer c.users.mapLock.Unlock()
for i := range p.Users {
c.users.uidMap[format.UserTag(p.Tag, p.Users[i].Uuid)] = p.Users[i].Id
}
var users []*protocol.User
switch p.NodeInfo.Type {
case "vmess":
users = buildVmessUsers(p.Tag, p.Users)

View File

@@ -30,16 +30,29 @@ func init() {
// Xray Structure
type Xray struct {
access sync.Mutex
Server *core.Instance
ihm inbound.Manager
ohm outbound.Manager
shm statsFeature.Manager
dispatcher *dispatcher.DefaultDispatcher
access sync.Mutex
Server *core.Instance
ihm inbound.Manager
ohm outbound.Manager
shm statsFeature.Manager
dispatcher *dispatcher.DefaultDispatcher
users *UserMap
nodeReportMinTrafficBytes map[string]int64
}
type UserMap struct {
uidMap map[string]int
mapLock sync.RWMutex
}
func New(c *conf.CoreConfig) (vCore.Core, error) {
return &Xray{Server: getCore(c.XrayConfig)}, nil
return &Xray{
Server: getCore(c.XrayConfig),
users: &UserMap{
uidMap: make(map[string]int),
},
nodeReportMinTrafficBytes: make(map[string]int64),
}, nil
}
func parseConnectionConfig(c *conf.XrayConnectionConfig) (policy *coreConf.Policy) {