From 63d88843b66656765a110fd009754dc940a873f8 Mon Sep 17 00:00:00 2001 From: wyx2685 Date: Wed, 16 Jul 2025 15:49:17 +0900 Subject: [PATCH] =?UTF-8?q?test:=20Xray=E5=86=85=E6=A0=B8=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E7=94=A8=E6=88=B7=E6=97=B6=E5=B0=9D=E8=AF=95=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/xray/app/dispatcher/default.go | 14 +++++- core/xray/app/dispatcher/linkmanager.go | 61 +++++++++++++++++++++++++ core/xray/user.go | 1 + 3 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 core/xray/app/dispatcher/linkmanager.go diff --git a/core/xray/app/dispatcher/default.go b/core/xray/app/dispatcher/default.go index f8406df..dfa17af 100644 --- a/core/xray/app/dispatcher/default.go +++ b/core/xray/app/dispatcher/default.go @@ -102,8 +102,8 @@ type DefaultDispatcher struct { router routing.Router policy policy.Manager stats stats.Manager - dns dns.Client fdns dns.FakeDNSEngine + Wm *WriterManager } func init() { @@ -127,6 +127,9 @@ func (d *DefaultDispatcher) Init(config *Config, om outbound.Manager, router rou d.router = router d.policy = pm d.stats = sm + d.Wm = &WriterManager{ + writers: make(map[string]map[*ManagedWriter]struct{}), + } return nil } @@ -148,9 +151,14 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (* uplinkReader, uplinkWriter := pipe.New(opt...) downlinkReader, downlinkWriter := pipe.New(opt...) + managedWriter := &ManagedWriter{ + writer: uplinkWriter, + manager: d.Wm, + } + inboundLink := &transport.Link{ Reader: downlinkReader, - Writer: uplinkWriter, + Writer: managedWriter, } outboundLink := &transport.Link{ @@ -214,6 +222,8 @@ func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network) (* } } } + managedWriter.email = user.Email + d.Wm.AddWriter(managedWriter) return inboundLink, outboundLink, limit, nil } diff --git a/core/xray/app/dispatcher/linkmanager.go b/core/xray/app/dispatcher/linkmanager.go new file mode 100644 index 0000000..d35d7c5 --- /dev/null +++ b/core/xray/app/dispatcher/linkmanager.go @@ -0,0 +1,61 @@ +package dispatcher + +import ( + sync "sync" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/buf" +) + +type WriterManager struct { + writers map[string]map[*ManagedWriter]struct{} + mu sync.Mutex +} + +func (m *WriterManager) AddWriter(writer *ManagedWriter) { + m.mu.Lock() + defer m.mu.Unlock() + if _, exists := m.writers[writer.email]; !exists { + m.writers[writer.email] = make(map[*ManagedWriter]struct{}) + } + m.writers[writer.email][writer] = struct{}{} +} + +func (m *WriterManager) RemoveWriter(writer *ManagedWriter) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.writers[writer.email]; !exists { + return + } + delete(m.writers[writer.email], writer) +} + +func (m *WriterManager) RemoveWritersForUser(email string) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.writers[email]; !exists { + return + } + for writer := range m.writers[email] { + delete(m.writers[email], writer) + common.Close(writer.writer) + } + delete(m.writers, email) +} + +type ManagedWriter struct { + writer buf.Writer + email string + manager *WriterManager +} + +func (w *ManagedWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { + return w.writer.WriteMultiBuffer(mb) +} + +func (w *ManagedWriter) Close() error { + w.manager.RemoveWriter(w) + return common.Close(w.writer) +} diff --git a/core/xray/user.go b/core/xray/user.go index b98e84b..2f1c241 100644 --- a/core/xray/user.go +++ b/core/xray/user.go @@ -43,6 +43,7 @@ func (c *Xray) DelUsers(users []panel.UserInfo, tag string, _ *panel.NodeInfo) e down = "user>>>" + user + ">>>traffic>>>downlink" c.shm.UnregisterCounter(up) c.shm.UnregisterCounter(down) + c.dispatcher.Wm.RemoveWritersForUser(user) } return nil }