mirror of
https://github.com/wyx2685/V2bX.git
synced 2026-02-04 04:30:08 +00:00
fix: xray内核开启splice导致限速失效
This commit is contained in:
@@ -197,8 +197,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (*
|
|||||||
var lm *LinkManager
|
var lm *LinkManager
|
||||||
if lmloaded, ok := d.LinkManagers.Load(user.Email); !ok {
|
if lmloaded, ok := d.LinkManagers.Load(user.Email); !ok {
|
||||||
lm = &LinkManager{
|
lm = &LinkManager{
|
||||||
writers: make(map[*ManagedWriter]struct{}),
|
links: make(map[*ManagedWriter]buf.Reader),
|
||||||
readers: make(map[*ManagedReader]struct{}),
|
|
||||||
}
|
}
|
||||||
d.LinkManagers.Store(user.Email, lm)
|
d.LinkManagers.Store(user.Email, lm)
|
||||||
} else {
|
} else {
|
||||||
@@ -208,14 +207,10 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (*
|
|||||||
writer: uplinkWriter,
|
writer: uplinkWriter,
|
||||||
manager: lm,
|
manager: lm,
|
||||||
}
|
}
|
||||||
managedReader := &ManagedReader{
|
lm.AddLink(managedWriter, outboundLink.Reader)
|
||||||
reader: downlinkReader,
|
|
||||||
manager: lm,
|
|
||||||
}
|
|
||||||
lm.AddLink(managedWriter, managedReader)
|
|
||||||
inboundLink.Writer = managedWriter
|
inboundLink.Writer = managedWriter
|
||||||
outboundLink.Reader = managedReader
|
|
||||||
if w != nil {
|
if w != nil {
|
||||||
|
sessionInbound.CanSpliceCopy = 3
|
||||||
inboundLink.Writer = rate.NewRateLimitWriter(inboundLink.Writer, w)
|
inboundLink.Writer = rate.NewRateLimitWriter(inboundLink.Writer, w)
|
||||||
outboundLink.Writer = rate.NewRateLimitWriter(outboundLink.Writer, w)
|
outboundLink.Writer = rate.NewRateLimitWriter(outboundLink.Writer, w)
|
||||||
}
|
}
|
||||||
@@ -316,7 +311,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
|
|||||||
} else {
|
} else {
|
||||||
go func() {
|
go func() {
|
||||||
cReader := &cachedReader{
|
cReader := &cachedReader{
|
||||||
reader: outbound.Reader.(*ManagedReader),
|
reader: outbound.Reader.(*pipe.Reader),
|
||||||
}
|
}
|
||||||
outbound.Reader = cReader
|
outbound.Reader = cReader
|
||||||
result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network)
|
result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network)
|
||||||
@@ -396,8 +391,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
|
|||||||
var lm *LinkManager
|
var lm *LinkManager
|
||||||
if lmloaded, ok := d.LinkManagers.Load(user.Email); !ok {
|
if lmloaded, ok := d.LinkManagers.Load(user.Email); !ok {
|
||||||
lm = &LinkManager{
|
lm = &LinkManager{
|
||||||
writers: make(map[*ManagedWriter]struct{}),
|
links: make(map[*ManagedWriter]buf.Reader),
|
||||||
readers: make(map[*ManagedReader]struct{}),
|
|
||||||
}
|
}
|
||||||
d.LinkManagers.Store(user.Email, lm)
|
d.LinkManagers.Store(user.Email, lm)
|
||||||
} else {
|
} else {
|
||||||
@@ -407,14 +401,9 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
|
|||||||
writer: outbound.Writer,
|
writer: outbound.Writer,
|
||||||
manager: lm,
|
manager: lm,
|
||||||
}
|
}
|
||||||
managedReader := &ManagedReader{
|
|
||||||
reader: &buf.TimeoutWrapperReader{Reader: outbound.Reader},
|
|
||||||
manager: lm,
|
|
||||||
}
|
|
||||||
lm.AddLink(managedWriter, managedReader)
|
|
||||||
outbound.Writer = managedWriter
|
outbound.Writer = managedWriter
|
||||||
outbound.Reader = managedReader
|
|
||||||
if w != nil {
|
if w != nil {
|
||||||
|
sessionInbound.CanSpliceCopy = 3
|
||||||
outbound.Writer = rate.NewRateLimitWriter(outbound.Writer, w)
|
outbound.Writer = rate.NewRateLimitWriter(outbound.Writer, w)
|
||||||
}
|
}
|
||||||
var t *counter.TrafficCounter
|
var t *counter.TrafficCounter
|
||||||
@@ -428,9 +417,10 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
|
|||||||
ts := t.GetCounter(user.Email)
|
ts := t.GetCounter(user.Email)
|
||||||
downcounter := &counter.XrayTrafficCounter{V: &ts.DownCounter}
|
downcounter := &counter.XrayTrafficCounter{V: &ts.DownCounter}
|
||||||
outbound.Reader = &CounterReader{
|
outbound.Reader = &CounterReader{
|
||||||
Reader: managedReader,
|
Reader: &buf.TimeoutWrapperReader{Reader: outbound.Reader},
|
||||||
Counter: &ts.UpCounter,
|
Counter: &ts.UpCounter,
|
||||||
}
|
}
|
||||||
|
lm.AddLink(managedWriter, outbound.Reader)
|
||||||
outbound.Writer = &dispatcher.SizeStatWriter{
|
outbound.Writer = &dispatcher.SizeStatWriter{
|
||||||
Counter: downcounter,
|
Counter: downcounter,
|
||||||
Writer: outbound.Writer,
|
Writer: outbound.Writer,
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package dispatcher
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
sync "sync"
|
sync "sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/xtls/xray-core/common"
|
"github.com/xtls/xray-core/common"
|
||||||
"github.com/xtls/xray-core/common/buf"
|
"github.com/xtls/xray-core/common/buf"
|
||||||
@@ -22,56 +21,28 @@ func (w *ManagedWriter) Close() error {
|
|||||||
return common.Close(w.writer)
|
return common.Close(w.writer)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ManagedReader struct {
|
|
||||||
reader buf.TimeoutReader
|
|
||||||
manager *LinkManager
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ManagedReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
|
|
||||||
return r.reader.ReadMultiBuffer()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ManagedReader) ReadMultiBufferTimeout(t time.Duration) (buf.MultiBuffer, error) {
|
|
||||||
return r.reader.ReadMultiBufferTimeout(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ManagedReader) Interrupt() {
|
|
||||||
r.manager.RemoveReader(r)
|
|
||||||
common.Interrupt(r.reader)
|
|
||||||
}
|
|
||||||
|
|
||||||
type LinkManager struct {
|
type LinkManager struct {
|
||||||
writers map[*ManagedWriter]struct{}
|
links map[*ManagedWriter]buf.Reader
|
||||||
readers map[*ManagedReader]struct{}
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LinkManager) AddLink(writer *ManagedWriter, reader *ManagedReader) {
|
func (m *LinkManager) AddLink(writer *ManagedWriter, reader buf.Reader) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
m.writers[writer] = struct{}{}
|
m.links[writer] = reader
|
||||||
m.readers[reader] = struct{}{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LinkManager) RemoveWriter(writer *ManagedWriter) {
|
func (m *LinkManager) RemoveWriter(writer *ManagedWriter) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
delete(m.writers, writer)
|
delete(m.links, writer)
|
||||||
}
|
|
||||||
|
|
||||||
func (m *LinkManager) RemoveReader(reader *ManagedReader) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
delete(m.readers, reader)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LinkManager) CloseAll() {
|
func (m *LinkManager) CloseAll() {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
for w := range m.writers {
|
for w, r := range m.links {
|
||||||
common.Close(w)
|
common.Close(w)
|
||||||
}
|
|
||||||
for r := range m.readers {
|
|
||||||
common.Interrupt(r)
|
common.Interrupt(r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user