test: Add vless encryption

This commit is contained in:
wyx2685
2025-09-12 14:26:58 +09:00
parent 0824bf7a4e
commit 10f66b57ea
7 changed files with 171 additions and 41 deletions

View File

@@ -0,0 +1,37 @@
package dispatcher
import (
"sync/atomic"
"time"
"github.com/xtls/xray-core/common/buf"
)
var _ buf.TimeoutReader = (*CounterReader)(nil)
type CounterReader struct {
Reader buf.TimeoutReader
Counter *atomic.Int64
}
func (c *CounterReader) ReadMultiBufferTimeout(time.Duration) (buf.MultiBuffer, error) {
mb, err := c.Reader.ReadMultiBufferTimeout(time.Second)
if err != nil {
return nil, err
}
if mb.Len() > 0 {
c.Counter.Add(int64(mb.Len()))
}
return mb, nil
}
func (c *CounterReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
mb, err := c.Reader.ReadMultiBuffer()
if err != nil {
return nil, err
}
if mb.Len() > 0 {
c.Counter.Add(int64(mb.Len()))
}
return mb, nil
}

View File

@@ -37,7 +37,7 @@ var errSniffingTimeout = errors.New("timeout on sniffing")
type cachedReader struct {
sync.Mutex
reader *pipe.Reader
reader buf.TimeoutReader
cache buf.MultiBuffer
}
@@ -95,7 +95,9 @@ func (r *cachedReader) Interrupt() {
r.cache = buf.ReleaseMulti(r.cache)
}
r.Unlock()
r.reader.Interrupt()
if p, ok := r.reader.(*pipe.Reader); ok {
p.Interrupt()
}
}
// DefaultDispatcher is a default implementation of Dispatcher.
@@ -168,7 +170,6 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (*
var user *protocol.MemoryUser
if sessionInbound != nil {
user = sessionInbound.User
//sessionInbound.CanSpliceCopy = 3
}
var limit *limiter.Limiter
@@ -353,12 +354,70 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
content = new(session.Content)
ctx = session.ContextWithContent(ctx, content)
}
sessionInbound := session.InboundFromContext(ctx)
var user *protocol.MemoryUser
if sessionInbound != nil {
user = sessionInbound.User
}
var limit *limiter.Limiter
var err error
if user != nil && len(user.Email) > 0 {
limit, err = limiter.GetLimiter(sessionInbound.Tag)
if err != nil {
errors.LogInfo(ctx, "get limiter ", sessionInbound.Tag, " error: ", err)
common.Close(outbound.Writer)
common.Interrupt(outbound.Reader)
return errors.New("get limiter ", sessionInbound.Tag, " error: ", err)
}
// Speed Limit and Device Limit
w, reject := limit.CheckLimit(user.Email,
sessionInbound.Source.Address.IP().String(),
destination.Network == net.Network_TCP,
sessionInbound.Source.Network == net.Network_TCP)
if reject {
errors.LogInfo(ctx, "Limited ", user.Email, " by conn or ip")
common.Close(outbound.Writer)
common.Interrupt(outbound.Reader)
return errors.New("Limited ", user.Email, " by conn or ip")
}
managedWriter := &ManagedWriter{
writer: outbound.Writer,
email: user.Email,
manager: d.Wm,
}
d.Wm.AddWriter(managedWriter)
outbound.Writer = managedWriter
if w != nil {
outbound.Writer = rate.NewRateLimitWriter(outbound.Writer, w)
}
var t *counter.TrafficCounter
if c, ok := d.Counter.Load(sessionInbound.Tag); !ok {
t = counter.NewTrafficCounter()
d.Counter.Store(sessionInbound.Tag, t)
} else {
t = c.(*counter.TrafficCounter)
}
ts := t.GetCounter(user.Email)
downcounter := &counter.XrayTrafficCounter{V: &ts.DownCounter}
outbound.Reader = &CounterReader{
Reader: &buf.TimeoutWrapperReader{Reader: outbound.Reader},
Counter: &ts.UpCounter,
}
outbound.Writer = &dispatcher.SizeStatWriter{
Counter: downcounter,
Writer: outbound.Writer,
}
}
sniffingRequest := content.SniffingRequest
if !sniffingRequest.Enabled {
d.routedDispatch(ctx, outbound, destination, nil, "")
d.routedDispatch(ctx, outbound, destination, limit, "")
} else {
cReader := &cachedReader{
reader: outbound.Reader.(*pipe.Reader),
reader: outbound.Reader.(buf.TimeoutReader),
}
outbound.Reader = cReader
result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network)
@@ -383,7 +442,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
ob.Target = destination
}
}
d.routedDispatch(ctx, outbound, destination, nil, content.Protocol)
d.routedDispatch(ctx, outbound, destination, limit, content.Protocol)
}
return nil
@@ -518,6 +577,9 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.
handler = h
} else {
errors.LogWarning(ctx, "non existing outTag: ", outTag)
common.Close(link.Writer)
common.Interrupt(link.Reader)
return // DO NOT CHANGE: the traffic shouldn't be processed by default outbound if the specified outbound tag doesn't exist (yet), e.g., VLESS Reverse Proxy
}
} else {
errors.LogInfo(ctx, "default route for ", destination)

View File

@@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"strings"
"time"
"encoding/json"
@@ -171,8 +172,27 @@ func buildV2ray(config *conf.Options, nodeInfo *panel.NodeInfo, inbound *coreCon
inbound.Settings = (*json.RawMessage)(&s)
} else {
var err error
decryption := "none"
if nodeInfo.VAllss.Encryption != "" {
switch nodeInfo.VAllss.Encryption {
case "mlkem768x25519plus":
encSettings := nodeInfo.VAllss.EncryptionSettings
parts := []string{
"mlkem768x25519plus",
encSettings.Mode,
encSettings.Ticket,
}
if encSettings.ServerPadding != "" {
parts = append(parts, encSettings.ServerPadding)
}
parts = append(parts, encSettings.PrivateKey)
decryption = strings.Join(parts, ".")
default:
return fmt.Errorf("vless decryption method %s is not support", nodeInfo.VAllss.Encryption)
}
}
s, err := json.Marshal(&coreConf.VLessInboundConfig{
Decryption: "none",
Decryption: decryption,
})
if err != nil {
return fmt.Errorf("marshal vless config error: %s", err)