From 65f2de55ea7a7d90ee3c8f472930fbda97c1a52a Mon Sep 17 00:00:00 2001 From: wyx2685 Date: Thu, 18 Sep 2025 20:39:18 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=B5=81=E9=87=8F=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E4=B8=BAnull=EF=BC=8Cxray=E6=97=A0=E6=B3=95=E6=96=AD=E5=BC=80?= =?UTF-8?q?=E5=A4=B1=E6=95=88=E7=94=A8=E6=88=B7=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/xray/app/dispatcher/default.go | 61 +++++++++++----- core/xray/app/dispatcher/linkmanager.go | 96 ++++++++++++++----------- core/xray/user.go | 12 +++- 3 files changed, 109 insertions(+), 60 deletions(-) diff --git a/core/xray/app/dispatcher/default.go b/core/xray/app/dispatcher/default.go index c402a44..d12fba6 100644 --- a/core/xray/app/dispatcher/default.go +++ b/core/xray/app/dispatcher/default.go @@ -102,13 +102,13 @@ func (r *cachedReader) Interrupt() { // DefaultDispatcher is a default implementation of Dispatcher. type DefaultDispatcher struct { - ohm outbound.Manager - router routing.Router - policy policy.Manager - stats stats.Manager - fdns dns.FakeDNSEngine - Wm *WriterManager - Counter sync.Map + ohm outbound.Manager + router routing.Router + policy policy.Manager + stats stats.Manager + fdns dns.FakeDNSEngine + Counter sync.Map + LinkManagers sync.Map // map[string]*LinkManager } func init() { @@ -132,9 +132,6 @@ func (d *DefaultDispatcher) Init(config *Config, om outbound.Manager, router rou d.router = router d.policy = pm d.stats = sm - d.Wm = &WriterManager{ - writers: make(map[string]map[*ManagedWriter]struct{}), - } return nil } @@ -197,13 +194,27 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (* common.Interrupt(inboundLink.Reader) return nil, nil, nil, errors.New("Limited ", user.Email, " by conn or ip") } + var lm *LinkManager + if lmloaded, ok := d.LinkManagers.Load(user.Email); !ok { + lm = &LinkManager{ + writers: make(map[*ManagedWriter]struct{}), + readers: make(map[*ManagedReader]struct{}), + } + d.LinkManagers.Store(user.Email, lm) + } else { + lm = lmloaded.(*LinkManager) + } managedWriter := &ManagedWriter{ writer: uplinkWriter, - email: user.Email, - manager: d.Wm, + manager: lm, } - d.Wm.AddWriter(managedWriter) + managedReader := &ManagedReader{ + reader: downlinkReader, + manager: lm, + } + lm.AddLink(managedWriter, managedReader) inboundLink.Writer = managedWriter + outboundLink.Reader = managedReader if w != nil { inboundLink.Writer = rate.NewRateLimitWriter(inboundLink.Writer, w) outboundLink.Writer = rate.NewRateLimitWriter(outboundLink.Writer, w) @@ -305,7 +316,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin } else { go func() { cReader := &cachedReader{ - reader: outbound.Reader.(*pipe.Reader), + reader: outbound.Reader.(*ManagedReader), } outbound.Reader = cReader result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network) @@ -382,13 +393,27 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De common.Interrupt(outbound.Reader) return errors.New("Limited ", user.Email, " by conn or ip") } + var lm *LinkManager + if lmloaded, ok := d.LinkManagers.Load(user.Email); !ok { + lm = &LinkManager{ + writers: make(map[*ManagedWriter]struct{}), + readers: make(map[*ManagedReader]struct{}), + } + d.LinkManagers.Store(user.Email, lm) + } else { + lm = lmloaded.(*LinkManager) + } managedWriter := &ManagedWriter{ writer: outbound.Writer, - email: user.Email, - manager: d.Wm, + manager: lm, } - d.Wm.AddWriter(managedWriter) + managedReader := &ManagedReader{ + reader: &buf.TimeoutWrapperReader{Reader: outbound.Reader}, + manager: lm, + } + lm.AddLink(managedWriter, managedReader) outbound.Writer = managedWriter + outbound.Reader = managedReader if w != nil { outbound.Writer = rate.NewRateLimitWriter(outbound.Writer, w) } @@ -403,7 +428,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De ts := t.GetCounter(user.Email) downcounter := &counter.XrayTrafficCounter{V: &ts.DownCounter} outbound.Reader = &CounterReader{ - Reader: &buf.TimeoutWrapperReader{Reader: outbound.Reader}, + Reader: managedReader, Counter: &ts.UpCounter, } outbound.Writer = &dispatcher.SizeStatWriter{ diff --git a/core/xray/app/dispatcher/linkmanager.go b/core/xray/app/dispatcher/linkmanager.go index d35d7c5..66d1630 100644 --- a/core/xray/app/dispatcher/linkmanager.go +++ b/core/xray/app/dispatcher/linkmanager.go @@ -2,53 +2,15 @@ package dispatcher import ( sync "sync" + "time" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" ) -type WriterManager struct { - writers map[string]map[*ManagedWriter]struct{} - mu sync.Mutex -} - -func (m *WriterManager) AddWriter(writer *ManagedWriter) { - m.mu.Lock() - defer m.mu.Unlock() - if _, exists := m.writers[writer.email]; !exists { - m.writers[writer.email] = make(map[*ManagedWriter]struct{}) - } - m.writers[writer.email][writer] = struct{}{} -} - -func (m *WriterManager) RemoveWriter(writer *ManagedWriter) { - m.mu.Lock() - defer m.mu.Unlock() - - if _, exists := m.writers[writer.email]; !exists { - return - } - delete(m.writers[writer.email], writer) -} - -func (m *WriterManager) RemoveWritersForUser(email string) { - m.mu.Lock() - defer m.mu.Unlock() - - if _, exists := m.writers[email]; !exists { - return - } - for writer := range m.writers[email] { - delete(m.writers[email], writer) - common.Close(writer.writer) - } - delete(m.writers, email) -} - type ManagedWriter struct { writer buf.Writer - email string - manager *WriterManager + manager *LinkManager } func (w *ManagedWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { @@ -59,3 +21,57 @@ func (w *ManagedWriter) Close() error { w.manager.RemoveWriter(w) return common.Close(w.writer) } + +type ManagedReader struct { + reader buf.TimeoutReader + manager *LinkManager +} + +func (r *ManagedReader) ReadMultiBuffer() (buf.MultiBuffer, error) { + return r.reader.ReadMultiBuffer() +} + +func (r *ManagedReader) ReadMultiBufferTimeout(t time.Duration) (buf.MultiBuffer, error) { + return r.reader.ReadMultiBufferTimeout(t) +} + +func (r *ManagedReader) Interrupt() { + r.manager.RemoveReader(r) + common.Interrupt(r.reader) +} + +type LinkManager struct { + writers map[*ManagedWriter]struct{} + readers map[*ManagedReader]struct{} + mu sync.Mutex +} + +func (m *LinkManager) AddLink(writer *ManagedWriter, reader *ManagedReader) { + m.mu.Lock() + defer m.mu.Unlock() + m.writers[writer] = struct{}{} + m.readers[reader] = struct{}{} +} + +func (m *LinkManager) RemoveWriter(writer *ManagedWriter) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.writers, writer) +} + +func (m *LinkManager) RemoveReader(reader *ManagedReader) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.readers, reader) +} + +func (m *LinkManager) CloseAll() { + m.mu.Lock() + defer m.mu.Unlock() + for w := range m.writers { + common.Close(w) + } + for r := range m.readers { + common.Interrupt(r) + } +} diff --git a/core/xray/user.go b/core/xray/user.go index 4103317..a81bcb1 100644 --- a/core/xray/user.go +++ b/core/xray/user.go @@ -8,6 +8,7 @@ import ( "github.com/InazumaV/V2bX/common/counter" "github.com/InazumaV/V2bX/common/format" vCore "github.com/InazumaV/V2bX/core" + "github.com/InazumaV/V2bX/core/xray/app/dispatcher" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/proxy" ) @@ -43,8 +44,15 @@ func (c *Xray) DelUsers(users []panel.UserInfo, tag string, _ *panel.NodeInfo) e return err } delete(c.users.uidMap, user) - c.dispatcher.Counter.Delete(user) - c.dispatcher.Wm.RemoveWritersForUser(user) + if v, ok := c.dispatcher.Counter.Load(tag); ok { + tc := v.(*counter.TrafficCounter) + tc.Delete(user) + } + if v, ok := c.dispatcher.LinkManagers.Load(user); ok { + lm := v.(*dispatcher.LinkManager) + lm.CloseAll() + c.dispatcher.LinkManagers.Delete(user) + } } return nil }