From 9483526f3c85e02119def5d82a00d4a8c7c2a238 Mon Sep 17 00:00:00 2001 From: wyx2685 Date: Fri, 19 Sep 2025 00:44:21 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20xray=E5=86=85=E6=A0=B8=E5=BC=80=E5=90=AF?= =?UTF-8?q?splice=E5=AF=BC=E8=87=B4=E9=99=90=E9=80=9F=E5=A4=B1=E6=95=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/xray/app/dispatcher/default.go | 26 +++++----------- core/xray/app/dispatcher/linkmanager.go | 41 ++++--------------------- 2 files changed, 14 insertions(+), 53 deletions(-) diff --git a/core/xray/app/dispatcher/default.go b/core/xray/app/dispatcher/default.go index d12fba6..a9022e7 100644 --- a/core/xray/app/dispatcher/default.go +++ b/core/xray/app/dispatcher/default.go @@ -197,8 +197,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (* var lm *LinkManager if lmloaded, ok := d.LinkManagers.Load(user.Email); !ok { lm = &LinkManager{ - writers: make(map[*ManagedWriter]struct{}), - readers: make(map[*ManagedReader]struct{}), + links: make(map[*ManagedWriter]buf.Reader), } d.LinkManagers.Store(user.Email, lm) } else { @@ -208,14 +207,10 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (* writer: uplinkWriter, manager: lm, } - managedReader := &ManagedReader{ - reader: downlinkReader, - manager: lm, - } - lm.AddLink(managedWriter, managedReader) + lm.AddLink(managedWriter, outboundLink.Reader) inboundLink.Writer = managedWriter - outboundLink.Reader = managedReader if w != nil { + sessionInbound.CanSpliceCopy = 3 inboundLink.Writer = rate.NewRateLimitWriter(inboundLink.Writer, w) outboundLink.Writer = rate.NewRateLimitWriter(outboundLink.Writer, w) } @@ -316,7 +311,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin } else { go func() { cReader := &cachedReader{ - reader: outbound.Reader.(*ManagedReader), + reader: outbound.Reader.(*pipe.Reader), } outbound.Reader = cReader result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network) @@ -396,8 +391,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De var lm *LinkManager if lmloaded, ok := d.LinkManagers.Load(user.Email); !ok { lm = &LinkManager{ - writers: make(map[*ManagedWriter]struct{}), - readers: make(map[*ManagedReader]struct{}), + links: make(map[*ManagedWriter]buf.Reader), } d.LinkManagers.Store(user.Email, lm) } else { @@ -407,14 +401,9 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De writer: outbound.Writer, manager: lm, } - managedReader := &ManagedReader{ - reader: &buf.TimeoutWrapperReader{Reader: outbound.Reader}, - manager: lm, - } - lm.AddLink(managedWriter, managedReader) outbound.Writer = managedWriter - outbound.Reader = managedReader if w != nil { + sessionInbound.CanSpliceCopy = 3 outbound.Writer = rate.NewRateLimitWriter(outbound.Writer, w) } var t *counter.TrafficCounter @@ -428,9 +417,10 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De ts := t.GetCounter(user.Email) downcounter := &counter.XrayTrafficCounter{V: &ts.DownCounter} outbound.Reader = &CounterReader{ - Reader: managedReader, + Reader: &buf.TimeoutWrapperReader{Reader: outbound.Reader}, Counter: &ts.UpCounter, } + lm.AddLink(managedWriter, outbound.Reader) outbound.Writer = &dispatcher.SizeStatWriter{ Counter: downcounter, Writer: outbound.Writer, diff --git a/core/xray/app/dispatcher/linkmanager.go b/core/xray/app/dispatcher/linkmanager.go index 66d1630..7f260e7 100644 --- a/core/xray/app/dispatcher/linkmanager.go +++ b/core/xray/app/dispatcher/linkmanager.go @@ -2,7 +2,6 @@ package dispatcher import ( sync "sync" - "time" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" @@ -22,56 +21,28 @@ func (w *ManagedWriter) Close() error { return common.Close(w.writer) } -type ManagedReader struct { - reader buf.TimeoutReader - manager *LinkManager -} - -func (r *ManagedReader) ReadMultiBuffer() (buf.MultiBuffer, error) { - return r.reader.ReadMultiBuffer() -} - -func (r *ManagedReader) ReadMultiBufferTimeout(t time.Duration) (buf.MultiBuffer, error) { - return r.reader.ReadMultiBufferTimeout(t) -} - -func (r *ManagedReader) Interrupt() { - r.manager.RemoveReader(r) - common.Interrupt(r.reader) -} - type LinkManager struct { - writers map[*ManagedWriter]struct{} - readers map[*ManagedReader]struct{} - mu sync.Mutex + links map[*ManagedWriter]buf.Reader + mu sync.Mutex } -func (m *LinkManager) AddLink(writer *ManagedWriter, reader *ManagedReader) { +func (m *LinkManager) AddLink(writer *ManagedWriter, reader buf.Reader) { m.mu.Lock() defer m.mu.Unlock() - m.writers[writer] = struct{}{} - m.readers[reader] = struct{}{} + m.links[writer] = reader } func (m *LinkManager) RemoveWriter(writer *ManagedWriter) { m.mu.Lock() defer m.mu.Unlock() - delete(m.writers, writer) -} - -func (m *LinkManager) RemoveReader(reader *ManagedReader) { - m.mu.Lock() - defer m.mu.Unlock() - delete(m.readers, reader) + delete(m.links, writer) } func (m *LinkManager) CloseAll() { m.mu.Lock() defer m.mu.Unlock() - for w := range m.writers { + for w, r := range m.links { common.Close(w) - } - for r := range m.readers { common.Interrupt(r) } }