mirror of
https://github.com/wyx2685/V2bX.git
synced 2026-02-04 04:30:08 +00:00
change project structure, fix online ip report bug
This commit is contained in:
@@ -5,8 +5,6 @@ package dispatcher
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/Yuzuki616/V2bX/app/limiter"
|
||||
"github.com/Yuzuki616/V2bX/app/rule"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
"github.com/xtls/xray-core/common/log"
|
||||
@@ -96,8 +94,8 @@ type DefaultDispatcher struct {
|
||||
stats stats.Manager
|
||||
dns dns.Client
|
||||
fdns dns.FakeDNSEngine
|
||||
Limiter *limiter.Limiter
|
||||
RuleManager *rule.Rule
|
||||
Limiter *Limiter
|
||||
RuleManager *Rule
|
||||
}
|
||||
|
||||
func init() {
|
||||
@@ -121,8 +119,8 @@ func (d *DefaultDispatcher) Init(config *Config, om outbound.Manager, router rou
|
||||
d.router = router
|
||||
d.policy = pm
|
||||
d.stats = sm
|
||||
d.Limiter = limiter.New()
|
||||
d.RuleManager = rule.New()
|
||||
d.Limiter = NewLimiter()
|
||||
d.RuleManager = NewRule()
|
||||
d.dns = dns
|
||||
return nil
|
||||
}
|
||||
|
||||
257
core/app/dispatcher/limiter.go
Normal file
257
core/app/dispatcher/limiter.go
Normal file
@@ -0,0 +1,257 @@
|
||||
// Package limiter is to control the links that go into the dispather
|
||||
package dispatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/Yuzuki616/V2bX/api/panel"
|
||||
"github.com/juju/ratelimit"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type UserInfo struct {
|
||||
UID int
|
||||
SpeedLimit uint64
|
||||
DeviceLimit int
|
||||
}
|
||||
|
||||
type InboundInfo struct {
|
||||
Tag string
|
||||
NodeSpeedLimit uint64
|
||||
UserInfo *sync.Map // Key: Uid value: UserInfo
|
||||
BucketHub *sync.Map // key: Uid, value: *ratelimit.Bucket
|
||||
UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool
|
||||
}
|
||||
|
||||
type Limiter struct {
|
||||
InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo
|
||||
}
|
||||
|
||||
func NewLimiter() *Limiter {
|
||||
return &Limiter{
|
||||
InboundInfo: new(sync.Map),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userList []panel.UserInfo) error {
|
||||
inboundInfo := &InboundInfo{
|
||||
Tag: tag,
|
||||
NodeSpeedLimit: nodeInfo.SpeedLimit,
|
||||
BucketHub: new(sync.Map),
|
||||
UserOnlineIP: new(sync.Map),
|
||||
}
|
||||
userMap := new(sync.Map)
|
||||
for i := range userList {
|
||||
/*if (*userList)[i].SpeedLimit == 0 {
|
||||
(*userList)[i].SpeedLimit = nodeInfo.SpeedLimit
|
||||
}
|
||||
if (*userList)[i].DeviceLimit == 0 {
|
||||
(*userList)[i].DeviceLimit = nodeInfo.DeviceLimit
|
||||
}*/
|
||||
userMap.Store(fmt.Sprintf("%s|%s|%d", tag, (userList)[i].V2rayUser.Email, (userList)[i].UID),
|
||||
UserInfo{
|
||||
UID: (userList)[i].UID,
|
||||
SpeedLimit: nodeInfo.SpeedLimit,
|
||||
DeviceLimit: nodeInfo.DeviceLimit,
|
||||
})
|
||||
}
|
||||
inboundInfo.UserInfo = userMap
|
||||
l.InboundInfo.Store(tag, inboundInfo) // Replace the old inbound info
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Limiter) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, updatedUserList []panel.UserInfo) error {
|
||||
if value, ok := l.InboundInfo.Load(tag); ok {
|
||||
inboundInfo := value.(*InboundInfo)
|
||||
// Update User info
|
||||
for i := range updatedUserList {
|
||||
inboundInfo.UserInfo.Store(fmt.Sprintf("%s|%s|%d", tag,
|
||||
(updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID), UserInfo{
|
||||
UID: (updatedUserList)[i].UID,
|
||||
SpeedLimit: nodeInfo.SpeedLimit,
|
||||
DeviceLimit: nodeInfo.DeviceLimit,
|
||||
})
|
||||
inboundInfo.BucketHub.Delete(fmt.Sprintf("%s|%s|%d", tag,
|
||||
(updatedUserList)[i].V2rayUser.Email, (updatedUserList)[i].UID)) // Delete old limiter bucket
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("no such inbound in limiter: %s", tag)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Limiter) DeleteInboundLimiter(tag string) error {
|
||||
l.InboundInfo.Delete(tag)
|
||||
return nil
|
||||
}
|
||||
|
||||
type UserIp struct {
|
||||
Uid int `json:"Uid"`
|
||||
IPs []string `json:"Ips"`
|
||||
}
|
||||
|
||||
func (l *Limiter) GetOnlineUserIp(tag string) ([]UserIp, error) {
|
||||
if value, ok := l.InboundInfo.Load(tag); ok {
|
||||
inboundInfo := value.(*InboundInfo)
|
||||
// Clear Speed Limiter bucket for users who are not online
|
||||
inboundInfo.BucketHub.Range(func(key, value interface{}) bool {
|
||||
if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists {
|
||||
inboundInfo.BucketHub.Delete(key.(string))
|
||||
}
|
||||
return true
|
||||
})
|
||||
onlineUser := make([]UserIp, 0)
|
||||
var ipMap *sync.Map
|
||||
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
|
||||
ipMap = value.(*sync.Map)
|
||||
var ip []string
|
||||
ipMap.Range(func(key, v interface{}) bool {
|
||||
if v.(bool) {
|
||||
ip = append(ip, key.(string))
|
||||
}
|
||||
return true
|
||||
})
|
||||
if len(ip) > 0 {
|
||||
if u, ok := inboundInfo.UserInfo.Load(key.(string)); ok {
|
||||
onlineUser = append(onlineUser, UserIp{
|
||||
Uid: u.(UserInfo).UID,
|
||||
IPs: ip,
|
||||
})
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
if len(onlineUser) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return onlineUser, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("no such inbound in limiter: %s", tag)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIp) {
|
||||
if v, ok := l.InboundInfo.Load(tag); ok {
|
||||
inboundInfo := v.(*InboundInfo)
|
||||
//Clear old IP
|
||||
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
|
||||
inboundInfo.UserOnlineIP.Delete(key)
|
||||
return true
|
||||
})
|
||||
// Update User Online IP
|
||||
for i := range userIpList {
|
||||
ipMap := new(sync.Map)
|
||||
for _, userIp := range (userIpList)[i].IPs {
|
||||
ipMap.Store(userIp, false)
|
||||
}
|
||||
inboundInfo.UserOnlineIP.Store((userIpList)[i].Uid, ipMap)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Limiter) ClearOnlineUserIP(tag string) {
|
||||
if v, ok := l.InboundInfo.Load(tag); ok {
|
||||
inboundInfo := v.(*InboundInfo)
|
||||
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
|
||||
inboundInfo.UserOnlineIP.Delete(key)
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) {
|
||||
if value, ok := l.InboundInfo.Load(tag); ok {
|
||||
inboundInfo := value.(*InboundInfo)
|
||||
nodeLimit := inboundInfo.NodeSpeedLimit
|
||||
var userLimit uint64 = 0
|
||||
var deviceLimit = 0
|
||||
if v, ok := inboundInfo.UserInfo.Load(email); ok {
|
||||
u := v.(UserInfo)
|
||||
userLimit = u.SpeedLimit
|
||||
deviceLimit = u.DeviceLimit
|
||||
}
|
||||
ipMap := new(sync.Map)
|
||||
ipMap.Store(ip, true)
|
||||
// If any device is online
|
||||
if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok {
|
||||
ipMap := v.(*sync.Map)
|
||||
// If this ip is a new device
|
||||
if online, ok := ipMap.LoadOrStore(ip, true); !ok {
|
||||
counter := 0
|
||||
ipMap.Range(func(key, value interface{}) bool {
|
||||
counter++
|
||||
return true
|
||||
})
|
||||
if counter > deviceLimit && deviceLimit > 0 {
|
||||
ipMap.Delete(ip)
|
||||
return nil, false, true
|
||||
}
|
||||
} else {
|
||||
if !online.(bool) {
|
||||
ipMap.Store(ip, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
limit := determineRate(nodeLimit, userLimit) // If need the Speed limit
|
||||
if limit > 0 {
|
||||
limiter := ratelimit.NewBucketWithQuantum(time.Second, int64(limit), int64(limit)) // Byte/s
|
||||
if v, ok := inboundInfo.BucketHub.LoadOrStore(email, limiter); ok {
|
||||
bucket := v.(*ratelimit.Bucket)
|
||||
return bucket, true, false
|
||||
} else {
|
||||
return limiter, true, false
|
||||
}
|
||||
} else {
|
||||
return nil, false, false
|
||||
}
|
||||
} else {
|
||||
newError("Get Inbound Limiter information failed").AtDebug().WriteToLog()
|
||||
return nil, false, false
|
||||
}
|
||||
}
|
||||
|
||||
type Writer struct {
|
||||
writer buf.Writer
|
||||
limiter *ratelimit.Bucket
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
func (l *Limiter) RateWriter(writer buf.Writer, limiter *ratelimit.Bucket) buf.Writer {
|
||||
return &Writer{
|
||||
writer: writer,
|
||||
limiter: limiter,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Writer) Close() error {
|
||||
return common.Close(w.writer)
|
||||
}
|
||||
|
||||
func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
|
||||
w.limiter.Wait(int64(mb.Len()))
|
||||
return w.writer.WriteMultiBuffer(mb)
|
||||
}
|
||||
|
||||
// determineRate returns the minimum non-zero rate
|
||||
func determineRate(nodeLimit, userLimit uint64) (limit uint64) {
|
||||
if nodeLimit == 0 || userLimit == 0 {
|
||||
if nodeLimit > userLimit {
|
||||
return nodeLimit
|
||||
} else if nodeLimit < userLimit {
|
||||
return userLimit
|
||||
} else {
|
||||
return 0
|
||||
}
|
||||
} else {
|
||||
if nodeLimit > userLimit {
|
||||
return userLimit
|
||||
} else if nodeLimit < userLimit {
|
||||
return nodeLimit
|
||||
} else {
|
||||
return nodeLimit
|
||||
}
|
||||
}
|
||||
}
|
||||
105
core/app/dispatcher/rule.go
Normal file
105
core/app/dispatcher/rule.go
Normal file
@@ -0,0 +1,105 @@
|
||||
// Package rule is to control the audit rule behaviors
|
||||
package dispatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/Yuzuki616/V2bX/api/panel"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
mapset "github.com/deckarep/golang-set"
|
||||
)
|
||||
|
||||
type Rule struct {
|
||||
InboundRule *sync.Map // Key: Tag, Value: []api.DetectRule
|
||||
InboundProtocolRule *sync.Map // Key: Tag, Value: []string
|
||||
InboundDetectResult *sync.Map // key: Tag, Value: mapset.NewSet []api.DetectResult
|
||||
}
|
||||
|
||||
func NewRule() *Rule {
|
||||
return &Rule{
|
||||
InboundRule: new(sync.Map),
|
||||
InboundProtocolRule: new(sync.Map),
|
||||
InboundDetectResult: new(sync.Map),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Rule) UpdateRule(tag string, newRuleList []panel.DetectRule) error {
|
||||
if value, ok := r.InboundRule.LoadOrStore(tag, newRuleList); ok {
|
||||
oldRuleList := value.([]panel.DetectRule)
|
||||
if !reflect.DeepEqual(oldRuleList, newRuleList) {
|
||||
r.InboundRule.Store(tag, newRuleList)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Rule) UpdateProtocolRule(tag string, ruleList []string) error {
|
||||
if value, ok := r.InboundProtocolRule.LoadOrStore(tag, ruleList); ok {
|
||||
old := value.([]string)
|
||||
if !reflect.DeepEqual(old, ruleList) {
|
||||
r.InboundProtocolRule.Store(tag, ruleList)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Rule) GetDetectResult(tag string) ([]panel.DetectResult, error) {
|
||||
detectResult := make([]panel.DetectResult, 0)
|
||||
if value, ok := r.InboundDetectResult.LoadAndDelete(tag); ok {
|
||||
resultSet := value.(mapset.Set)
|
||||
it := resultSet.Iterator()
|
||||
for result := range it.C {
|
||||
detectResult = append(detectResult, result.(panel.DetectResult))
|
||||
}
|
||||
}
|
||||
return detectResult, nil
|
||||
}
|
||||
|
||||
func (r *Rule) Detect(tag string, destination string, email string) (reject bool) {
|
||||
reject = false
|
||||
var hitRuleID = -1
|
||||
// If we have some rule for this inbound
|
||||
if value, ok := r.InboundRule.Load(tag); ok {
|
||||
ruleList := value.([]panel.DetectRule)
|
||||
for _, r := range ruleList {
|
||||
if r.Pattern.Match([]byte(destination)) {
|
||||
hitRuleID = r.ID
|
||||
reject = true
|
||||
break
|
||||
}
|
||||
}
|
||||
// If we hit some rule
|
||||
if reject && hitRuleID != -1 {
|
||||
l := strings.Split(email, "|")
|
||||
uid, err := strconv.Atoi(l[len(l)-1])
|
||||
if err != nil {
|
||||
newError(fmt.Sprintf("Record illegal behavior failed! Cannot find user's uid: %s", email)).AtDebug().WriteToLog()
|
||||
return reject
|
||||
}
|
||||
newSet := mapset.NewSetWith(panel.DetectResult{UID: uid, RuleID: hitRuleID})
|
||||
// If there are any hit history
|
||||
if v, ok := r.InboundDetectResult.LoadOrStore(tag, newSet); ok {
|
||||
resultSet := v.(mapset.Set)
|
||||
// If this is a new record
|
||||
if resultSet.Add(panel.DetectResult{UID: uid, RuleID: hitRuleID}) {
|
||||
r.InboundDetectResult.Store(tag, resultSet)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return reject
|
||||
}
|
||||
func (r *Rule) ProtocolDetect(tag string, protocol string) bool {
|
||||
if value, ok := r.InboundProtocolRule.Load(tag); ok {
|
||||
ruleList := value.([]string)
|
||||
for _, r := range ruleList {
|
||||
if r == protocol {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -3,8 +3,8 @@ package core
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/Yuzuki616/V2bX/api"
|
||||
"github.com/Yuzuki616/V2bX/app/limiter"
|
||||
"github.com/Yuzuki616/V2bX/api/panel"
|
||||
"github.com/Yuzuki616/V2bX/core/app/dispatcher"
|
||||
"github.com/xtls/xray-core/core"
|
||||
"github.com/xtls/xray-core/features/inbound"
|
||||
)
|
||||
@@ -29,20 +29,20 @@ func (p *Core) AddInbound(config *core.InboundHandlerConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Core) AddInboundLimiter(tag string, nodeInfo *api.NodeInfo, userList []api.UserInfo) error {
|
||||
func (p *Core) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, userList []panel.UserInfo) error {
|
||||
err := p.dispatcher.Limiter.AddInboundLimiter(tag, nodeInfo, userList)
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *Core) GetInboundLimiter(tag string) (*limiter.InboundInfo, error) {
|
||||
func (p *Core) GetInboundLimiter(tag string) (*dispatcher.InboundInfo, error) {
|
||||
limit, ok := p.dispatcher.Limiter.InboundInfo.Load(tag)
|
||||
if ok {
|
||||
return limit.(*limiter.InboundInfo), nil
|
||||
return limit.(*dispatcher.InboundInfo), nil
|
||||
}
|
||||
return nil, fmt.Errorf("not found limiter")
|
||||
}
|
||||
|
||||
func (p *Core) UpdateInboundLimiter(tag string, nodeInfo *api.NodeInfo, updatedUserList []api.UserInfo) error {
|
||||
func (p *Core) UpdateInboundLimiter(tag string, nodeInfo *panel.NodeInfo, updatedUserList []panel.UserInfo) error {
|
||||
err := p.dispatcher.Limiter.UpdateInboundLimiter(tag, nodeInfo, updatedUserList)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"github.com/Yuzuki616/V2bX/api"
|
||||
"github.com/Yuzuki616/V2bX/api/panel"
|
||||
)
|
||||
|
||||
func (p *Core) UpdateRule(tag string, newRuleList []api.DetectRule) error {
|
||||
func (p *Core) UpdateRule(tag string, newRuleList []panel.DetectRule) error {
|
||||
return p.dispatcher.RuleManager.UpdateRule(tag, newRuleList)
|
||||
}
|
||||
|
||||
@@ -13,6 +13,6 @@ func (p *Core) UpdateProtocolRule(tag string, newRuleList []string) error {
|
||||
return p.dispatcher.RuleManager.UpdateProtocolRule(tag, newRuleList)
|
||||
}
|
||||
|
||||
func (p *Core) GetDetectResult(tag string) ([]api.DetectResult, error) {
|
||||
func (p *Core) GetDetectResult(tag string) ([]panel.DetectResult, error) {
|
||||
return p.dispatcher.RuleManager.GetDetectResult(tag)
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package core
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/Yuzuki616/V2bX/app/limiter"
|
||||
"github.com/Yuzuki616/V2bX/core/app/dispatcher"
|
||||
"github.com/xtls/xray-core/common/protocol"
|
||||
"github.com/xtls/xray-core/features/stats"
|
||||
"github.com/xtls/xray-core/proxy"
|
||||
@@ -74,11 +74,11 @@ func (p *Core) GetUserTraffic(email string) (up int64, down int64) {
|
||||
return up, down
|
||||
}
|
||||
|
||||
func (p *Core) GetOnlineIps(tag string) ([]limiter.UserIp, error) {
|
||||
func (p *Core) GetOnlineIps(tag string) ([]dispatcher.UserIp, error) {
|
||||
return p.dispatcher.Limiter.GetOnlineUserIp(tag)
|
||||
}
|
||||
|
||||
func (p *Core) UpdateOnlineIps(tag string, ips []limiter.UserIp) {
|
||||
func (p *Core) UpdateOnlineIps(tag string, ips []dispatcher.UserIp) {
|
||||
p.dispatcher.Limiter.UpdateOnlineUserIP(tag, ips)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user