refactor limiter
fix getLink bug
add connection limit
move limit config to ControllerConfig
del dynamic speed limit (next version will be re add)
del online ip sync (next version will be re add)
This commit is contained in:
yuzuki999
2023-05-16 09:15:29 +08:00
parent 2d7aaef066
commit 15c36a9580
35 changed files with 564 additions and 617 deletions

View File

@@ -5,6 +5,8 @@ package dispatcher
import (
"context"
"fmt"
"github.com/Yuzuki616/V2bX/common/rate"
"github.com/Yuzuki616/V2bX/limiter"
routingSession "github.com/xtls/xray-core/features/routing/session"
"strings"
"sync"
@@ -89,14 +91,12 @@ func (r *cachedReader) Interrupt() {
// DefaultDispatcher is a default implementation of Dispatcher.
type DefaultDispatcher struct {
ohm outbound.Manager
router routing.Router
policy policy.Manager
stats stats.Manager
dns dns.Client
fdns dns.FakeDNSEngine
Limiter *Limiter
RuleManager *Rule
ohm outbound.Manager
router routing.Router
policy policy.Manager
stats stats.Manager
dns dns.Client
fdns dns.FakeDNSEngine
}
func init() {
@@ -121,8 +121,6 @@ func (d *DefaultDispatcher) Init(config *Config, om outbound.Manager, router rou
d.policy = pm
d.stats = sm
d.dns = dns
d.Limiter = NewLimiter()
d.RuleManager = NewRule()
return nil
}
@@ -139,7 +137,7 @@ func (*DefaultDispatcher) Start() error {
// Close implements common.Closable.
func (*DefaultDispatcher) Close() error { return nil }
func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link) {
func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link, *limiter.Limiter, error) {
downOpt := pipe.OptionsFromContext(ctx)
upOpt := downOpt
@@ -226,21 +224,31 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sn
if sessionInbound != nil {
user = sessionInbound.User
}
var limit *limiter.Limiter
if user != nil && len(user.Email) > 0 {
// Speed Limit and Device Limit
bucket, ok, reject := d.Limiter.CheckSpeedAndDeviceLimit(sessionInbound.Tag, user.Email, sessionInbound.Source.Address.IP().String())
if reject {
newError("Devices reach the limit: ", user.Email).AtError().WriteToLog()
var err error
limit, err = limiter.GetLimiter(sessionInbound.Tag)
if err != nil {
newError("Get limit info error: ", err).AtError().WriteToLog()
common.Close(outboundLink.Writer)
common.Close(inboundLink.Writer)
common.Interrupt(outboundLink.Reader)
common.Interrupt(inboundLink.Reader)
return nil, nil
return nil, nil, nil, newError("Get limit info error: ", err)
}
if ok {
inboundLink.Writer = d.Limiter.RateWriter(inboundLink.Writer, bucket)
outboundLink.Writer = d.Limiter.RateWriter(outboundLink.Writer, bucket)
// Speed Limit and Device Limit
w, reject := limit.CheckLimit(user.Email, sessionInbound.Source.Address.IP().String())
if reject {
newError("Limited ", user.Email, " by conn or ip").AtWarning().WriteToLog()
common.Close(outboundLink.Writer)
common.Close(inboundLink.Writer)
common.Interrupt(outboundLink.Reader)
common.Interrupt(inboundLink.Reader)
return nil, nil, nil, newError("Limited ", user.Email, " by conn or ip")
}
if w != nil {
inboundLink.Writer = rate.NewRateLimitWriter(inboundLink.Writer, w)
outboundLink.Writer = rate.NewRateLimitWriter(outboundLink.Writer, w)
}
p := d.policy.ForLevel(user.Level)
if p.Stats.UserUplink {
@@ -263,7 +271,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sn
}
}
return inboundLink, outboundLink
return inboundLink, outboundLink, limit, nil
}
func (d *DefaultDispatcher) shouldOverride(ctx context.Context, result SniffResult, request session.SniffingRequest, destination net.Destination) bool {
@@ -313,11 +321,13 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
content = new(session.Content)
ctx = session.ContextWithContent(ctx, content)
}
sniffingRequest := content.SniffingRequest
inbound, outbound := d.getLink(ctx, destination.Network, sniffingRequest)
inbound, outbound, l, err := d.getLink(ctx, destination.Network, sniffingRequest)
if err != nil {
return nil, err
}
if !sniffingRequest.Enabled {
go d.routedDispatch(ctx, outbound, destination, "")
go d.routedDispatch(ctx, outbound, destination, l)
} else {
go func() {
cReader := &cachedReader{
@@ -338,7 +348,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
ob.Target = destination
}
}
d.routedDispatch(ctx, outbound, destination, content.Protocol)
d.routedDispatch(ctx, outbound, destination, l)
}()
}
return inbound, nil
@@ -360,7 +370,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
}
sniffingRequest := content.SniffingRequest
if !sniffingRequest.Enabled {
go d.routedDispatch(ctx, outbound, destination, content.Protocol)
go d.routedDispatch(ctx, outbound, destination, nil)
} else {
go func() {
cReader := &cachedReader{
@@ -381,10 +391,9 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
ob.Target = destination
}
}
d.routedDispatch(ctx, outbound, destination, content.Protocol)
d.routedDispatch(ctx, outbound, destination, nil)
}()
}
return nil
}
@@ -434,7 +443,7 @@ func sniffer(ctx context.Context, cReader *cachedReader, metadataOnly bool, netw
return contentResult, contentErr
}
func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination, protocol string) {
func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination, l *limiter.Limiter) {
ob := session.OutboundFromContext(ctx)
if hosts, ok := d.dns.(dns.HostsLookup); ok && destination.Address.Family().IsDomain() {
proxied := hosts.LookupHosts(ob.Target.String())
@@ -455,9 +464,22 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.
sessionInbound := session.InboundFromContext(ctx)
// Whether the inbound connection contains a user
if sessionInbound.User != nil {
if d.RuleManager.Detect(sessionInbound.Tag, destination.String(), protocol) {
if l == nil {
var err error
l, err = limiter.GetLimiter(sessionInbound.Tag)
if err != nil {
newError("Get limiter error: ", err).AtError().WriteToLog()
common.Close(link.Writer)
common.Interrupt(link.Reader)
return
}
} else {
defer func() {
l.ConnLimiter.DelConnCount(sessionInbound.User.Email, sessionInbound.Source.Address.IP().String())
}()
}
if l.CheckDomainRule(destination.String()) {
newError(fmt.Sprintf("User %s access %s reject by rule", sessionInbound.User.Email, destination.String())).AtError().WriteToLog()
newError("destination is reject by rule")
common.Close(link.Writer)
common.Interrupt(link.Reader)
return

View File

@@ -1,305 +0,0 @@
package dispatcher
import (
"fmt"
"github.com/Yuzuki616/V2bX/api/panel"
"github.com/juju/ratelimit"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"io"
"sync"
"time"
)
type UserLimitInfo struct {
UID int
SpeedLimit int
DynamicSpeedLimit int
ExpireTime int64
}
type InboundInfo struct {
Tag string
NodeSpeedLimit int
NodeDeviceLimit int
UserLimitInfo *sync.Map // Key: Uid value: UserLimitInfo
SpeedLimiter *sync.Map // key: Uid, value: *ratelimit.Bucket
UserOnlineIP *sync.Map // Key: Uid Value: *sync.Map: Key: IP, Value: bool
}
type Limiter struct {
InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo
}
func NewLimiter() *Limiter {
return &Limiter{
InboundInfo: new(sync.Map),
}
}
func (l *Limiter) AddInboundLimiter(tag string, nodeInfo *panel.NodeInfo, users []panel.UserInfo) error {
inboundInfo := &InboundInfo{
Tag: tag,
NodeSpeedLimit: nodeInfo.SpeedLimit,
NodeDeviceLimit: nodeInfo.DeviceLimit,
UserLimitInfo: new(sync.Map),
SpeedLimiter: new(sync.Map),
UserOnlineIP: new(sync.Map),
}
for i := range users {
if users[i].SpeedLimit != 0 {
userLimit := &UserLimitInfo{
UID: users[i].Id,
SpeedLimit: users[i].SpeedLimit,
ExpireTime: 0,
}
inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, users[i].Uuid, users[i].Id), userLimit)
}
}
l.InboundInfo.Store(tag, inboundInfo) // Replace the old inbound info
return nil
}
func (l *Limiter) UpdateInboundLimiter(tag string, added []panel.UserInfo, deleted []panel.UserInfo) error {
if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo)
for i := range deleted {
inboundInfo.UserLimitInfo.Delete(fmt.Sprintf("%s|%s|%d", tag,
(deleted)[i].Uuid, (deleted)[i].Id))
}
for i := range added {
if added[i].SpeedLimit != 0 {
userLimit := &UserLimitInfo{
UID: added[i].Id,
SpeedLimit: added[i].SpeedLimit,
ExpireTime: 0,
}
inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag,
(added)[i].Uuid, (added)[i].Id), userLimit)
}
}
} else {
return fmt.Errorf("no such inbound in limiter: %s", tag)
}
return nil
}
func (l *Limiter) DeleteInboundLimiter(tag string) error {
l.InboundInfo.Delete(tag)
return nil
}
func (l *Limiter) AddDynamicSpeedLimit(tag string, userInfo *panel.UserInfo, limit int, expire int64) error {
if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo)
userLimit := &UserLimitInfo{
DynamicSpeedLimit: limit,
ExpireTime: time.Now().Add(time.Duration(expire) * time.Second).Unix(),
}
inboundInfo.UserLimitInfo.Store(fmt.Sprintf("%s|%s|%d", tag, userInfo.Uuid, userInfo.Id), userLimit)
return nil
} else {
return fmt.Errorf("no such inbound in limiter: %s", tag)
}
}
type UserIpList struct {
Uid int `json:"Uid"`
IpList []string `json:"Ips"`
}
func (l *Limiter) ListOnlineUserIp(tag string) ([]UserIpList, error) {
if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo)
onlineUser := make([]UserIpList, 0)
var ipMap *sync.Map
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
ipMap = value.(*sync.Map)
var ip []string
ipMap.Range(func(key, v interface{}) bool {
if v.(bool) {
ip = append(ip, key.(string))
}
return true
})
if len(ip) > 0 {
if u, ok := inboundInfo.UserLimitInfo.Load(key.(string)); ok {
onlineUser = append(onlineUser, UserIpList{
Uid: u.(*UserLimitInfo).UID,
IpList: ip,
})
}
}
return true
})
if len(onlineUser) == 0 {
return nil, nil
}
return onlineUser, nil
} else {
return nil, fmt.Errorf("no such inbound in limiter: %s", tag)
}
}
func (l *Limiter) UpdateOnlineUserIP(tag string, userIpList []UserIpList) {
if v, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := v.(*InboundInfo)
//Clear old IP
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
inboundInfo.UserOnlineIP.Delete(key)
return true
})
// Update User Online IP
for i := range userIpList {
ipMap := new(sync.Map)
for _, userIp := range (userIpList)[i].IpList {
ipMap.Store(userIp, false)
}
inboundInfo.UserOnlineIP.Store((userIpList)[i].Uid, ipMap)
}
inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool {
if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists {
inboundInfo.SpeedLimiter.Delete(key.(string))
}
return true
})
}
}
func (l *Limiter) ClearOnlineUserIpAndSpeedLimiter(tag string) {
if v, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := v.(*InboundInfo)
inboundInfo.SpeedLimiter.Range(func(key, value interface{}) bool {
if _, exists := inboundInfo.UserOnlineIP.Load(key.(string)); !exists {
inboundInfo.SpeedLimiter.Delete(key.(string))
}
return true
})
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
inboundInfo.UserOnlineIP.Delete(key)
return true
})
}
}
func (l *Limiter) CheckSpeedAndDeviceLimit(tag string, email string, ip string) (speedLimiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) {
if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo)
nodeLimit := inboundInfo.NodeSpeedLimit
userLimit := 0
if v, ok := inboundInfo.UserLimitInfo.Load(email); ok {
u := v.(*UserLimitInfo)
if u.ExpireTime < time.Now().Unix() && u.ExpireTime != 0 {
if u.SpeedLimit != 0 {
userLimit = u.SpeedLimit
u.DynamicSpeedLimit = 0
u.ExpireTime = 0
} else {
inboundInfo.UserLimitInfo.Delete(email)
}
} else {
userLimit = determineSpeedLimit(u.SpeedLimit, u.DynamicSpeedLimit)
}
}
ipMap := new(sync.Map)
ipMap.Store(ip, true)
// If any device is online
if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok {
ipMap := v.(*sync.Map)
// If this ip is a new device
if online, ok := ipMap.LoadOrStore(ip, true); !ok {
counter := 0
ipMap.Range(func(key, value interface{}) bool {
counter++
return true
})
if counter > inboundInfo.NodeDeviceLimit && inboundInfo.NodeDeviceLimit > 0 {
ipMap.Delete(ip)
return nil, false, true
}
} else {
if !online.(bool) {
ipMap.Store(ip, true)
}
}
}
limit := int64(determineSpeedLimit(nodeLimit, userLimit)) * 1000000 / 8 // If you need the Speed limit
if limit > 0 {
limiter := ratelimit.NewBucketWithQuantum(time.Second, limit, limit) // Byte/s
if v, ok := inboundInfo.SpeedLimiter.LoadOrStore(email, limiter); ok {
return v.(*ratelimit.Bucket), true, false
} else {
inboundInfo.SpeedLimiter.Store(email, limiter)
return limiter, true, false
}
} else {
return nil, false, false
}
} else {
newError("Get Inbound Limiter information failed").AtDebug().WriteToLog()
return nil, false, false
}
}
type Writer struct {
writer buf.Writer
limiter *ratelimit.Bucket
w io.Writer
}
func (l *Limiter) RateWriter(writer buf.Writer, limiter *ratelimit.Bucket) buf.Writer {
return &Writer{
writer: writer,
limiter: limiter,
}
}
func (w *Writer) Close() error {
return common.Close(w.writer)
}
func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.limiter.Wait(int64(mb.Len()))
return w.writer.WriteMultiBuffer(mb)
}
// determineSpeedLimit returns the minimum non-zero rate
func determineSpeedLimit(limit1, limit2 int) (limit int) {
if limit1 == 0 || limit2 == 0 {
if limit1 > limit2 {
return limit1
} else if limit1 < limit2 {
return limit2
} else {
return 0
}
} else {
if limit1 > limit2 {
return limit2
} else if limit1 < limit2 {
return limit1
} else {
return limit1
}
}
}
func determineDeviceLimit(nodeLimit, userLimit int) (limit int) {
if nodeLimit == 0 || userLimit == 0 {
if nodeLimit > userLimit {
return nodeLimit
} else if nodeLimit < userLimit {
return userLimit
} else {
return 0
}
} else {
if nodeLimit > userLimit {
return userLimit
} else if nodeLimit < userLimit {
return nodeLimit
} else {
return nodeLimit
}
}
}

View File

@@ -1,42 +0,0 @@
package dispatcher
import (
"github.com/Yuzuki616/V2bX/api/panel"
"reflect"
"sync"
)
type Rule struct {
Rule *sync.Map // Key: Tag, Value: *panel.DetectRule
}
func NewRule() *Rule {
return &Rule{
Rule: new(sync.Map),
}
}
func (r *Rule) UpdateRule(tag string, newRuleList []panel.DestinationRule) error {
if value, ok := r.Rule.LoadOrStore(tag, newRuleList); ok {
oldRuleList := value.([]panel.DestinationRule)
if !reflect.DeepEqual(oldRuleList, newRuleList) {
r.Rule.Store(tag, newRuleList)
}
}
return nil
}
func (r *Rule) Detect(tag string, destination string, protocol string) (reject bool) {
reject = false
// If we have some rule for this inbound
if value, ok := r.Rule.Load(tag); ok {
ruleList := value.([]panel.DestinationRule)
for i := range ruleList {
if ruleList[i].Pattern.Match([]byte(destination)) {
reject = true
break
}
}
}
return reject
}