initial support for hysteria
update config
update api
change user email format
...
This commit is contained in:
yuzuki999
2023-06-08 22:46:33 +08:00
parent 52f9734278
commit 42bb7bc90f
30 changed files with 809 additions and 983 deletions

View File

@@ -1,74 +0,0 @@
package hy
import (
"context"
"crypto/tls"
"os"
"path/filepath"
"runtime"
"go.uber.org/zap"
"github.com/caddyserver/certmagic"
)
func acmeTLSConfig(domains []string, email string, disableHTTP bool, disableTLSALPN bool,
altHTTPPort int, altTLSALPNPort int,
) (*tls.Config, error) {
cfg := &certmagic.Config{
RenewalWindowRatio: certmagic.DefaultRenewalWindowRatio,
KeySource: certmagic.DefaultKeyGenerator,
Storage: &certmagic.FileStorage{Path: dataDir()},
Logger: zap.NewNop(),
}
issuer := certmagic.NewACMEIssuer(cfg, certmagic.ACMEIssuer{
CA: certmagic.LetsEncryptProductionCA,
TestCA: certmagic.LetsEncryptStagingCA,
Email: email,
Agreed: true,
DisableHTTPChallenge: disableHTTP,
DisableTLSALPNChallenge: disableTLSALPN,
AltHTTPPort: altHTTPPort,
AltTLSALPNPort: altTLSALPNPort,
Logger: zap.NewNop(),
})
cfg.Issuers = []certmagic.Issuer{issuer}
cache := certmagic.NewCache(certmagic.CacheOptions{
GetConfigForCert: func(cert certmagic.Certificate) (*certmagic.Config, error) {
return cfg, nil
},
Logger: zap.NewNop(),
})
cfg = certmagic.New(cache, *cfg)
err := cfg.ManageSync(context.Background(), domains)
if err != nil {
return nil, err
}
return cfg.TLSConfig(), nil
}
func homeDir() string {
home := os.Getenv("HOME")
if home == "" && runtime.GOOS == "windows" {
drive := os.Getenv("HOMEDRIVE")
path := os.Getenv("HOMEPATH")
home = drive + path
if drive == "" || path == "" {
home = os.Getenv("USERPROFILE")
}
}
if home == "" {
home = "."
}
return home
}
func dataDir() string {
baseDir := filepath.Join(homeDir(), ".local", "share")
if xdgData := os.Getenv("XDG_DATA_HOME"); xdgData != "" {
baseDir = xdgData
}
return filepath.Join(baseDir, "certmagic")
}

View File

@@ -1,13 +1,5 @@
package hy
import (
"errors"
"fmt"
"github.com/yosuke-furukawa/json5/encoding/json5"
"regexp"
"strconv"
)
const (
mbpsToBps = 125000
minSpeedBPS = 16384
@@ -27,281 +19,8 @@ const (
DefaultClientHopIntervalSec = 10
)
var rateStringRegexp = regexp.MustCompile(`^(\d+)\s*([KMGT]?)([Bb])ps$`)
type serverConfig struct {
Listen string `json:"listen"`
Protocol string `json:"protocol"`
ACME struct {
Domains []string `json:"domains"`
Email string `json:"email"`
DisableHTTPChallenge bool `json:"disable_http"`
DisableTLSALPNChallenge bool `json:"disable_tlsalpn"`
AltHTTPPort int `json:"alt_http_port"`
AltTLSALPNPort int `json:"alt_tlsalpn_port"`
} `json:"acme"`
CertFile string `json:"cert"`
KeyFile string `json:"key"`
// Optional below
Up string `json:"up"`
UpMbps int `json:"up_mbps"`
Down string `json:"down"`
DownMbps int `json:"down_mbps"`
DisableUDP bool `json:"disable_udp"`
ACL string `json:"acl"`
MMDB string `json:"mmdb"`
Obfs string `json:"obfs"`
Auth struct {
Mode string `json:"mode"`
Config json5.RawMessage `json:"config"`
} `json:"auth"`
ALPN string `json:"alpn"`
PrometheusListen string `json:"prometheus_listen"`
ReceiveWindowConn uint64 `json:"recv_window_conn"`
ReceiveWindowClient uint64 `json:"recv_window_client"`
MaxConnClient int `json:"max_conn_client"`
DisableMTUDiscovery bool `json:"disable_mtu_discovery"`
Resolver string `json:"resolver"`
ResolvePreference string `json:"resolve_preference"`
SOCKS5Outbound struct {
Server string `json:"server"`
User string `json:"user"`
Password string `json:"password"`
} `json:"socks5_outbound"`
BindOutbound struct {
Address string `json:"address"`
Device string `json:"device"`
} `json:"bind_outbound"`
}
func (c *serverConfig) Speed() (uint64, uint64, error) {
var up, down uint64
if len(c.Up) > 0 {
up = stringToBps(c.Up)
if up == 0 {
return 0, 0, errors.New("invalid speed format")
}
} else {
up = uint64(c.UpMbps) * mbpsToBps
}
if len(c.Down) > 0 {
down = stringToBps(c.Down)
if down == 0 {
return 0, 0, errors.New("invalid speed format")
}
} else {
down = uint64(c.DownMbps) * mbpsToBps
}
return up, down, nil
}
func (c *serverConfig) Check() error {
if len(c.Listen) == 0 {
return errors.New("missing listen address")
}
if len(c.ACME.Domains) == 0 && (len(c.CertFile) == 0 || len(c.KeyFile) == 0) {
return errors.New("need either ACME info or cert/key files")
}
if len(c.ACME.Domains) > 0 && (len(c.CertFile) > 0 || len(c.KeyFile) > 0) {
return errors.New("cannot use both ACME and cert/key files, they are mutually exclusive")
}
if up, down, err := c.Speed(); err != nil || (up != 0 && up < minSpeedBPS) || (down != 0 && down < minSpeedBPS) {
return errors.New("invalid speed")
}
if (c.ReceiveWindowConn != 0 && c.ReceiveWindowConn < 65536) ||
(c.ReceiveWindowClient != 0 && c.ReceiveWindowClient < 65536) {
return errors.New("invalid receive window size")
}
if c.MaxConnClient < 0 {
return errors.New("invalid max connections per client")
}
return nil
}
func (c *serverConfig) Fill() {
if len(c.ALPN) == 0 {
c.ALPN = DefaultALPN
}
if c.ReceiveWindowConn == 0 {
c.ReceiveWindowConn = DefaultStreamReceiveWindow
}
if c.ReceiveWindowClient == 0 {
c.ReceiveWindowClient = DefaultConnectionReceiveWindow
}
if c.MaxConnClient == 0 {
c.MaxConnClient = DefaultMaxIncomingStreams
}
if len(c.MMDB) == 0 {
c.MMDB = DefaultMMDBFilename
}
}
func (c *serverConfig) String() string {
return fmt.Sprintf("%+v", *c)
}
type Relay struct {
Listen string `json:"listen"`
Remote string `json:"remote"`
Timeout int `json:"timeout"`
}
func (r *Relay) Check() error {
if len(r.Listen) == 0 {
return errors.New("missing relay listen address")
}
if len(r.Remote) == 0 {
return errors.New("missing relay remote address")
}
if r.Timeout != 0 && r.Timeout < 4 {
return errors.New("invalid relay timeout")
}
return nil
}
type clientConfig struct {
Server string `json:"server"`
Protocol string `json:"protocol"`
Up string `json:"up"`
UpMbps int `json:"up_mbps"`
Down string `json:"down"`
DownMbps int `json:"down_mbps"`
// Optional below
Retry int `json:"retry"`
RetryInterval *int `json:"retry_interval"`
QuitOnDisconnect bool `json:"quit_on_disconnect"`
HandshakeTimeout int `json:"handshake_timeout"`
IdleTimeout int `json:"idle_timeout"`
HopInterval int `json:"hop_interval"`
SOCKS5 struct {
Listen string `json:"listen"`
Timeout int `json:"timeout"`
DisableUDP bool `json:"disable_udp"`
User string `json:"user"`
Password string `json:"password"`
} `json:"socks5"`
HTTP struct {
Listen string `json:"listen"`
Timeout int `json:"timeout"`
User string `json:"user"`
Password string `json:"password"`
Cert string `json:"cert"`
Key string `json:"key"`
} `json:"http"`
TUN struct {
Name string `json:"name"`
Timeout int `json:"timeout"`
MTU uint32 `json:"mtu"`
TCPSendBufferSize string `json:"tcp_sndbuf"`
TCPReceiveBufferSize string `json:"tcp_rcvbuf"`
TCPModerateReceiveBuffer bool `json:"tcp_autotuning"`
} `json:"tun"`
TCPRelays []Relay `json:"relay_tcps"`
TCPRelay Relay `json:"relay_tcp"` // deprecated, but we still support it for backward compatibility
UDPRelays []Relay `json:"relay_udps"`
UDPRelay Relay `json:"relay_udp"` // deprecated, but we still support it for backward compatibility
TCPTProxy struct {
Listen string `json:"listen"`
Timeout int `json:"timeout"`
} `json:"tproxy_tcp"`
UDPTProxy struct {
Listen string `json:"listen"`
Timeout int `json:"timeout"`
} `json:"tproxy_udp"`
TCPRedirect struct {
Listen string `json:"listen"`
Timeout int `json:"timeout"`
} `json:"redirect_tcp"`
ACL string `json:"acl"`
MMDB string `json:"mmdb"`
Obfs string `json:"obfs"`
Auth []byte `json:"auth"`
AuthString string `json:"auth_str"`
ALPN string `json:"alpn"`
ServerName string `json:"server_name"`
Insecure bool `json:"insecure"`
CustomCA string `json:"ca"`
ReceiveWindowConn uint64 `json:"recv_window_conn"`
ReceiveWindow uint64 `json:"recv_window"`
DisableMTUDiscovery bool `json:"disable_mtu_discovery"`
FastOpen bool `json:"fast_open"`
LazyStart bool `json:"lazy_start"`
Resolver string `json:"resolver"`
ResolvePreference string `json:"resolve_preference"`
}
func (c *clientConfig) Speed() (uint64, uint64, error) {
var up, down uint64
if len(c.Up) > 0 {
up = stringToBps(c.Up)
if up == 0 {
return 0, 0, errors.New("invalid speed format")
}
} else {
up = uint64(c.UpMbps) * mbpsToBps
}
if len(c.Down) > 0 {
down = stringToBps(c.Down)
if down == 0 {
return 0, 0, errors.New("invalid speed format")
}
} else {
down = uint64(c.DownMbps) * mbpsToBps
}
return up, down, nil
}
func (c *clientConfig) Fill() {
if len(c.ALPN) == 0 {
c.ALPN = DefaultALPN
}
if c.ReceiveWindowConn == 0 {
c.ReceiveWindowConn = DefaultStreamReceiveWindow
}
if c.ReceiveWindow == 0 {
c.ReceiveWindow = DefaultConnectionReceiveWindow
}
if len(c.MMDB) == 0 {
c.MMDB = DefaultMMDBFilename
}
if c.IdleTimeout == 0 {
c.IdleTimeout = DefaultClientIdleTimeoutSec
}
if c.HopInterval == 0 {
c.HopInterval = DefaultClientHopIntervalSec
}
}
func (c *clientConfig) String() string {
return fmt.Sprintf("%+v", *c)
}
func stringToBps(s string) uint64 {
if s == "" {
return 0
}
m := rateStringRegexp.FindStringSubmatch(s)
if m == nil {
return 0
}
var n uint64
switch m[2] {
case "K":
n = 1 << 10
case "M":
n = 1 << 20
case "G":
n = 1 << 30
case "T":
n = 1 << 40
default:
n = 1
}
v, _ := strconv.ParseUint(m[1], 10, 64)
n = v * n
if m[3] == "b" {
// Bits, need to convert to bytes
n = n >> 3
}
return n
func SpeedTrans(upM, downM int) (uint64, uint64) {
up := uint64(upM) * mbpsToBps
down := uint64(downM) * mbpsToBps
return up, down
}

67
core/hy/counter.go Normal file
View File

@@ -0,0 +1,67 @@
package hy
import (
"github.com/apernet/hysteria/core/cs"
"sync"
"sync/atomic"
)
type UserTrafficCounter struct {
counters map[string]*counters
lock sync.RWMutex
}
type counters struct {
UpCounter atomic.Int64
DownCounter atomic.Int64
//ConnGauge atomic.Int64
}
func NewUserTrafficCounter() cs.TrafficCounter {
return new(UserTrafficCounter)
}
func (c *UserTrafficCounter) getCounters(auth string) *counters {
c.lock.RLock()
cts, ok := c.counters[auth]
c.lock.RUnlock()
if !ok {
cts = &counters{}
c.counters[auth] = cts
}
return cts
}
func (c *UserTrafficCounter) Rx(auth string, n int) {
cts := c.getCounters(auth)
cts.DownCounter.Add(int64(n))
}
func (c *UserTrafficCounter) Tx(auth string, n int) {
cts := c.getCounters(auth)
cts.UpCounter.Add(int64(n))
}
func (c *UserTrafficCounter) IncConn(_ string) {
/*cts := c.getCounters(auth)
cts.ConnGauge.Add(1)*/
return
}
func (c *UserTrafficCounter) DecConn(_ string) {
/*cts := c.getCounters(auth)
cts.ConnGauge.Add(1)*/
return
}
func (c *UserTrafficCounter) Reset(auth string) {
cts := c.getCounters(auth)
cts.UpCounter.Store(0)
cts.DownCounter.Store(0)
}
func (c *UserTrafficCounter) Delete(auth string) {
c.lock.Lock()
delete(c.counters, auth)
c.lock.Unlock()
}

7
core/hy/counter_test.go Normal file
View File

@@ -0,0 +1,7 @@
package hy
import "testing"
func TestUserTrafficCounter_Rx(t *testing.T) {
}

View File

@@ -1,306 +1,37 @@
package hy
import (
"crypto/tls"
"io"
"net"
"net/http"
"time"
"github.com/oschwald/geoip2-golang"
"github.com/quic-go/quic-go"
"github.com/apernet/hysteria/app/auth"
"github.com/apernet/hysteria/core/acl"
"github.com/apernet/hysteria/core/cs"
"github.com/apernet/hysteria/core/pktconns"
"github.com/apernet/hysteria/core/pmtud"
"github.com/apernet/hysteria/core/sockopt"
"github.com/apernet/hysteria/core/transport"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/yosuke-furukawa/json5/encoding/json5"
"fmt"
"github.com/Yuzuki616/V2bX/conf"
"github.com/hashicorp/go-multierror"
"sync"
)
var serverPacketConnFuncFactoryMap = map[string]pktconns.ServerPacketConnFuncFactory{
"": pktconns.NewServerUDPConnFunc,
"udp": pktconns.NewServerUDPConnFunc,
"wechat": pktconns.NewServerWeChatConnFunc,
"wechat-video": pktconns.NewServerWeChatConnFunc,
"faketcp": pktconns.NewServerFakeTCPConnFunc,
type Hy struct {
servers sync.Map
}
func server(config *serverConfig) {
logrus.WithField("config", config.String()).Info("Server configuration loaded")
config.Fill() // Fill default values
// Resolver
if len(config.Resolver) > 0 {
err := setResolver(config.Resolver)
func New(_ *conf.CoreConfig) (*Hy, error) {
return &Hy{
servers: sync.Map{},
}, nil
}
func (h *Hy) Start() error {
return nil
}
func (h *Hy) Close() error {
var errs error
h.servers.Range(func(tag, s any) bool {
err := s.(*Server).Close()
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to set resolver")
errs = multierror.Append(errs, fmt.Errorf("close %s error: %s", tag, err))
}
return true
})
if errs != nil {
return errs
}
// Load TLS config
var tlsConfig *tls.Config
if len(config.ACME.Domains) > 0 {
// ACME mode
tc, err := acmeTLSConfig(config.ACME.Domains, config.ACME.Email,
config.ACME.DisableHTTPChallenge, config.ACME.DisableTLSALPNChallenge,
config.ACME.AltHTTPPort, config.ACME.AltTLSALPNPort)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to get a certificate with ACME")
}
tc.NextProtos = []string{config.ALPN}
tc.MinVersion = tls.VersionTLS13
tlsConfig = tc
} else {
// Local cert mode
kpl, err := newKeypairLoader(config.CertFile, config.KeyFile)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"cert": config.CertFile,
"key": config.KeyFile,
}).Fatal("Failed to load the certificate")
}
tlsConfig = &tls.Config{
GetCertificate: kpl.GetCertificateFunc(),
NextProtos: []string{config.ALPN},
MinVersion: tls.VersionTLS13,
}
}
// QUIC config
quicConfig := &quic.Config{
InitialStreamReceiveWindow: config.ReceiveWindowConn,
MaxStreamReceiveWindow: config.ReceiveWindowConn,
InitialConnectionReceiveWindow: config.ReceiveWindowClient,
MaxConnectionReceiveWindow: config.ReceiveWindowClient,
MaxIncomingStreams: int64(config.MaxConnClient),
MaxIdleTimeout: ServerMaxIdleTimeoutSec * time.Second,
KeepAlivePeriod: 0, // Keep alive should solely be client's responsibility
DisablePathMTUDiscovery: config.DisableMTUDiscovery,
EnableDatagrams: true,
}
if !quicConfig.DisablePathMTUDiscovery && pmtud.DisablePathMTUDiscovery {
logrus.Info("Path MTU Discovery is not yet supported on this platform")
}
// Auth
var authFunc cs.ConnectFunc
var err error
switch authMode := config.Auth.Mode; authMode {
case "", "none":
if len(config.Obfs) == 0 {
logrus.Warn("Neither authentication nor obfuscation is turned on. " +
"Your server could be used by anyone! Are you sure this is what you want?")
}
authFunc = func(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string) {
return true, "Welcome"
}
case "password", "passwords":
authFunc, err = auth.PasswordAuthFunc(config.Auth.Config)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to enable password authentication")
} else {
logrus.Info("Password authentication enabled")
}
case "external":
authFunc, err = auth.ExternalAuthFunc(config.Auth.Config)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to enable external authentication")
} else {
logrus.Info("External authentication enabled")
}
default:
logrus.WithField("mode", config.Auth.Mode).Fatal("Unsupported authentication mode")
}
connectFunc := func(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string) {
ok, msg := authFunc(addr, auth, sSend, sRecv)
if !ok {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"msg": msg,
}).Info("Authentication failed, client rejected")
} else {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
}).Info("Client connected")
}
return ok, msg
}
// Resolve preference
if len(config.ResolvePreference) > 0 {
pref, err := transport.ResolvePreferenceFromString(config.ResolvePreference)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to parse the resolve preference")
}
transport.DefaultServerTransport.ResolvePreference = pref
}
// SOCKS5 outbound
if config.SOCKS5Outbound.Server != "" {
transport.DefaultServerTransport.SOCKS5Client = transport.NewSOCKS5Client(config.SOCKS5Outbound.Server,
config.SOCKS5Outbound.User, config.SOCKS5Outbound.Password)
}
// Bind outbound
if config.BindOutbound.Device != "" {
iface, err := net.InterfaceByName(config.BindOutbound.Device)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to find the interface")
}
transport.DefaultServerTransport.LocalUDPIntf = iface
sockopt.BindDialer(transport.DefaultServerTransport.Dialer, iface)
}
if config.BindOutbound.Address != "" {
ip := net.ParseIP(config.BindOutbound.Address)
if ip == nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to parse the address")
}
transport.DefaultServerTransport.Dialer.LocalAddr = &net.TCPAddr{IP: ip}
transport.DefaultServerTransport.LocalUDPAddr = &net.UDPAddr{IP: ip}
}
// ACL
var aclEngine *acl.Engine
if len(config.ACL) > 0 {
aclEngine, err = acl.LoadFromFile(config.ACL, func(addr string) (*net.IPAddr, error) {
ipAddr, _, err := transport.DefaultServerTransport.ResolveIPAddr(addr)
return ipAddr, err
},
func() (*geoip2.Reader, error) {
return loadMMDBReader(config.MMDB)
})
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"file": config.ACL,
}).Fatal("Failed to parse ACL")
}
aclEngine.DefaultAction = acl.ActionDirect
}
// Prometheus
var trafficCounter cs.TrafficCounter
if len(config.PrometheusListen) > 0 {
promReg := prometheus.NewRegistry()
trafficCounter = NewPrometheusTrafficCounter(promReg)
go func() {
http.Handle("/metrics", promhttp.HandlerFor(promReg, promhttp.HandlerOpts{}))
err := http.ListenAndServe(config.PrometheusListen, nil)
logrus.WithField("error", err).Fatal("Prometheus HTTP server error")
}()
}
// Packet conn
pktConnFuncFactory := serverPacketConnFuncFactoryMap[config.Protocol]
if pktConnFuncFactory == nil {
logrus.WithField("protocol", config.Protocol).Fatal("Unsupported protocol")
}
pktConnFunc := pktConnFuncFactory(config.Obfs)
pktConn, err := pktConnFunc(config.Listen)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"addr": config.Listen,
}).Fatal("Failed to listen on the UDP address")
}
// Server
up, down, _ := config.Speed()
server, err := cs.NewServer(tlsConfig, quicConfig, pktConn,
transport.DefaultServerTransport, up, down, config.DisableUDP, aclEngine,
connectFunc, disconnectFunc, tcpRequestFunc, tcpErrorFunc, udpRequestFunc, udpErrorFunc, trafficCounter)
if err != nil {
logrus.WithField("error", err).Fatal("Failed to initialize server")
}
defer server.Close()
logrus.WithField("addr", config.Listen).Info("Server up and running")
err = server.Serve()
logrus.WithField("error", err).Fatal("Server shutdown")
}
func disconnectFunc(addr net.Addr, auth []byte, err error) {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"error": err,
}).Info("Client disconnected")
}
func tcpRequestFunc(addr net.Addr, auth []byte, reqAddr string, action acl.Action, arg string) {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"dst": defaultIPMasker.Mask(reqAddr),
"action": actionToString(action, arg),
}).Debug("TCP request")
}
func tcpErrorFunc(addr net.Addr, auth []byte, reqAddr string, err error) {
if err != io.EOF {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"dst": defaultIPMasker.Mask(reqAddr),
"error": err,
}).Info("TCP error")
} else {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"dst": defaultIPMasker.Mask(reqAddr),
}).Debug("TCP EOF")
}
}
func udpRequestFunc(addr net.Addr, auth []byte, sessionID uint32) {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"session": sessionID,
}).Debug("UDP request")
}
func udpErrorFunc(addr net.Addr, auth []byte, sessionID uint32, err error) {
if err != io.EOF {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"session": sessionID,
"error": err,
}).Info("UDP error")
} else {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"session": sessionID,
}).Debug("UDP EOF")
}
}
func actionToString(action acl.Action, arg string) string {
switch action {
case acl.ActionDirect:
return "Direct"
case acl.ActionProxy:
return "Proxy"
case acl.ActionBlock:
return "Block"
case acl.ActionHijack:
return "Hijack to " + arg
default:
return "Unknown"
}
}
func parseServerConfig(cb []byte) (*serverConfig, error) {
var c serverConfig
err := json5.Unmarshal(cb, &c)
if err != nil {
return nil, err
}
return &c, c.Check()
return nil
}

34
core/hy/node.go Normal file
View File

@@ -0,0 +1,34 @@
package hy
import (
"errors"
"fmt"
"github.com/Yuzuki616/V2bX/api/panel"
"github.com/Yuzuki616/V2bX/conf"
"github.com/apernet/hysteria/core/cs"
)
func (h *Hy) AddNode(tag string, info *panel.NodeInfo, c *conf.ControllerConfig) error {
if info.Type != "hysteria" {
return errors.New("the core not support " + info.Type)
}
s := NewServer(tag)
err := s.runServer(info, c)
if err != nil {
return fmt.Errorf("run hy server error: %s", err)
}
h.servers.Store(tag, s)
return nil
}
func (h *Hy) DelNode(tag string) error {
if s, e := h.servers.Load(tag); e {
err := s.(*cs.Server).Close()
if err != nil {
return err
}
h.servers.Delete(tag)
return nil
}
return errors.New("the node is not have")
}

View File

@@ -1,71 +0,0 @@
package hy
import (
"github.com/apernet/hysteria/core/cs"
"github.com/prometheus/client_golang/prometheus"
)
type prometheusTrafficCounter struct {
reg *prometheus.Registry
upCounterVec *prometheus.CounterVec
downCounterVec *prometheus.CounterVec
connGaugeVec *prometheus.GaugeVec
counterMap map[string]counters
}
type counters struct {
UpCounter prometheus.Counter
DownCounter prometheus.Counter
ConnGauge prometheus.Gauge
}
func NewPrometheusTrafficCounter(reg *prometheus.Registry) cs.TrafficCounter {
c := &prometheusTrafficCounter{
reg: reg,
upCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "hysteria_traffic_uplink_bytes_total",
}, []string{"auth"}),
downCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "hysteria_traffic_downlink_bytes_total",
}, []string{"auth"}),
connGaugeVec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "hysteria_active_conn",
}, []string{"auth"}),
counterMap: make(map[string]counters),
}
reg.MustRegister(c.upCounterVec, c.downCounterVec, c.connGaugeVec)
return c
}
func (c *prometheusTrafficCounter) getCounters(auth string) counters {
cts, ok := c.counterMap[auth]
if !ok {
cts = counters{
UpCounter: c.upCounterVec.WithLabelValues(auth),
DownCounter: c.downCounterVec.WithLabelValues(auth),
ConnGauge: c.connGaugeVec.WithLabelValues(auth),
}
c.counterMap[auth] = cts
}
return cts
}
func (c *prometheusTrafficCounter) Rx(auth string, n int) {
cts := c.getCounters(auth)
cts.DownCounter.Add(float64(n))
}
func (c *prometheusTrafficCounter) Tx(auth string, n int) {
cts := c.getCounters(auth)
cts.UpCounter.Add(float64(n))
}
func (c *prometheusTrafficCounter) IncConn(auth string) {
cts := c.getCounters(auth)
cts.ConnGauge.Inc()
}
func (c *prometheusTrafficCounter) DecConn(auth string) {
cts := c.getCounters(auth)
cts.ConnGauge.Dec()
}

263
core/hy/server.go Normal file
View File

@@ -0,0 +1,263 @@
package hy
import (
"crypto/tls"
"errors"
"fmt"
"github.com/Yuzuki616/V2bX/api/panel"
"github.com/Yuzuki616/V2bX/conf"
"github.com/apernet/hysteria/core/sockopt"
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/quic-go/quic-go"
"github.com/apernet/hysteria/core/acl"
"github.com/apernet/hysteria/core/cs"
"github.com/apernet/hysteria/core/pktconns"
"github.com/apernet/hysteria/core/pmtud"
"github.com/apernet/hysteria/core/transport"
"github.com/sirupsen/logrus"
)
var serverPacketConnFuncFactoryMap = map[string]pktconns.ServerPacketConnFuncFactory{
"": pktconns.NewServerUDPConnFunc,
"udp": pktconns.NewServerUDPConnFunc,
"wechat": pktconns.NewServerWeChatConnFunc,
"wechat-video": pktconns.NewServerWeChatConnFunc,
"faketcp": pktconns.NewServerFakeTCPConnFunc,
}
type Server struct {
tag string
counter UserTrafficCounter
users sync.Map
running atomic.Bool
*cs.Server
}
func NewServer(tag string) *Server {
return &Server{
tag: tag,
}
}
func (s *Server) runServer(node *panel.NodeInfo, c *conf.ControllerConfig) error {
if c.HyOptions == nil {
return errors.New("hy options is not vail")
}
// Resolver
if len(c.HyOptions.Resolver) > 0 {
err := setResolver(c.HyOptions.Resolver)
if err != nil {
return fmt.Errorf("set resolver error: %s", err)
}
}
// tls config
kpl, err := newKeypairLoader(c.CertConfig.CertFile, c.CertConfig.KeyFile)
if err != nil {
return fmt.Errorf("load cert error: %s", err)
}
tlsConfig := &tls.Config{
GetCertificate: kpl.GetCertificateFunc(),
NextProtos: []string{DefaultALPN},
MinVersion: tls.VersionTLS13,
}
// QUIC config
quicConfig := &quic.Config{
InitialStreamReceiveWindow: DefaultStreamReceiveWindow,
MaxStreamReceiveWindow: DefaultStreamReceiveWindow,
InitialConnectionReceiveWindow: DefaultConnectionReceiveWindow,
MaxConnectionReceiveWindow: DefaultConnectionReceiveWindow,
MaxIncomingStreams: int64(DefaultMaxIncomingStreams),
MaxIdleTimeout: ServerMaxIdleTimeoutSec * time.Second,
KeepAlivePeriod: 0, // Keep alive should solely be client's responsibility
DisablePathMTUDiscovery: false,
EnableDatagrams: true,
}
if !quicConfig.DisablePathMTUDiscovery && pmtud.DisablePathMTUDiscovery {
logrus.Info("Path MTU Discovery is not yet supported on this platform")
}
// Resolve preference
if len(c.HyOptions.ResolvePreference) > 0 {
pref, err := transport.ResolvePreferenceFromString(c.HyOptions.Resolver)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to parse the resolve preference")
}
transport.DefaultServerTransport.ResolvePreference = pref
}
/*// SOCKS5 outbound
if config.SOCKS5Outbound.Server != "" {
transport.DefaultServerTransport.SOCKS5Client = transport.NewSOCKS5Client(config.SOCKS5Outbound.Server,
config.SOCKS5Outbound.User, config.SOCKS5Outbound.Password)
}*/
// Bind outbound
if c.HyOptions.SendDevice != "" {
iface, err := net.InterfaceByName(c.HyOptions.SendDevice)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to find the interface")
}
transport.DefaultServerTransport.LocalUDPIntf = iface
sockopt.BindDialer(transport.DefaultServerTransport.Dialer, iface)
}
if c.SendIP != "" {
ip := net.ParseIP(c.SendIP)
if ip == nil {
logrus.WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to parse the address")
}
transport.DefaultServerTransport.Dialer.LocalAddr = &net.TCPAddr{IP: ip}
transport.DefaultServerTransport.LocalUDPAddr = &net.UDPAddr{IP: ip}
}
// ACL
var aclEngine *acl.Engine
/*if len(config.ACL) > 0 {
aclEngine, err = acl.LoadFromFile(config.ACL, func(addr string) (*net.IPAddr, error) {
ipAddr, _, err := transport.DefaultServerTransport.ResolveIPAddr(addr)
return ipAddr, err
},
func() (*geoip2.Reader, error) {
return loadMMDBReader(config.MMDB)
})
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"file": config.ACL,
}).Fatal("Failed to parse ACL")
}
aclEngine.DefaultAction = acl.ActionDirect
}*/
// Prometheus
trafficCounter := NewUserTrafficCounter()
// Packet conn
pktConnFuncFactory := serverPacketConnFuncFactoryMap[""]
if pktConnFuncFactory == nil {
return fmt.Errorf("unsopport protocol")
}
pktConnFunc := pktConnFuncFactory(node.HyObfs)
addr := fmt.Sprintf("%s:%d", c.ListenIP, node.Port)
pktConn, err := pktConnFunc(addr)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"addr": addr,
}).Fatal("Failed to listen on the UDP address")
}
// Server
up, down := SpeedTrans(node.UpMbps, node.DownMbps)
s.Server, err = cs.NewServer(tlsConfig, quicConfig, pktConn,
transport.DefaultServerTransport, up, down, false, aclEngine,
s.connectFunc, s.disconnectFunc, tcpRequestFunc, tcpErrorFunc, udpRequestFunc, udpErrorFunc, trafficCounter)
if err != nil {
return fmt.Errorf("new server error: %s", err)
}
logrus.WithField("addr", addr).Info("Server up and running")
go func() {
s.running.Store(true)
defer func() {
s.running.Store(false)
}()
err = s.Server.Serve()
if err != nil {
logrus.WithField("addr", addr).Errorf("serve error: %s", err)
}
}()
return nil
}
func (s *Server) authByUser(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string, string) {
if email, ok := s.users.Load(string(auth)); ok {
return true, email.(string), "Done"
}
return false, "", "Failed"
}
func (s *Server) connectFunc(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string) {
ok, email, msg := s.authByUser(addr, auth, sSend, sRecv)
if !ok {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
}).Info("Authentication failed, client rejected")
} else {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"email": email,
}).Info("Client connected")
}
return ok, msg
}
func (s *Server) disconnectFunc(addr net.Addr, auth []byte, err error) {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"error": err,
}).Info("Client disconnected")
}
func tcpRequestFunc(addr net.Addr, auth []byte, reqAddr string, action acl.Action, arg string) {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"dst": defaultIPMasker.Mask(reqAddr),
"action": actionToString(action, arg),
}).Debug("TCP request")
}
func tcpErrorFunc(addr net.Addr, auth []byte, reqAddr string, err error) {
if err != io.EOF {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"dst": defaultIPMasker.Mask(reqAddr),
"error": err,
}).Info("TCP error")
} else {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"dst": defaultIPMasker.Mask(reqAddr),
}).Debug("TCP EOF")
}
}
func udpRequestFunc(addr net.Addr, auth []byte, sessionID uint32) {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"session": sessionID,
}).Debug("UDP request")
}
func udpErrorFunc(addr net.Addr, auth []byte, sessionID uint32, err error) {
if err != io.EOF {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"session": sessionID,
"error": err,
}).Info("UDP error")
} else {
logrus.WithFields(logrus.Fields{
"src": defaultIPMasker.Mask(addr.String()),
"session": sessionID,
}).Debug("UDP EOF")
}
}
func actionToString(action acl.Action, arg string) string {
switch action {
case acl.ActionDirect:
return "Direct"
case acl.ActionProxy:
return "Proxy"
case acl.ActionBlock:
return "Block"
case acl.ActionHijack:
return "Hijack to " + arg
default:
return "Unknown"
}
}

25
core/hy/server_test.go Normal file
View File

@@ -0,0 +1,25 @@
package hy
import (
"github.com/Yuzuki616/V2bX/api/panel"
"github.com/Yuzuki616/V2bX/conf"
"testing"
)
func TestServer(t *testing.T) {
s := NewServer("test")
t.Log(s.runServer(&panel.NodeInfo{
Port: 11415,
UpMbps: 100,
DownMbps: 100,
HyObfs: "atresssdaaaadd",
}, &conf.ControllerConfig{
ListenIP: "127.0.0.1",
HyOptions: &conf.HyOptions{},
CertConfig: &conf.CertConfig{
CertFile: "../../test_data/1.pem",
KeyFile: "../../test_data/1.key",
},
}))
select {}
}

42
core/hy/user.go Normal file
View File

@@ -0,0 +1,42 @@
package hy
import (
"errors"
"github.com/Yuzuki616/V2bX/core"
)
func (h *Hy) AddUsers(p *core.AddUsersParams) (int, error) {
s, ok := h.servers.Load(p.Tag)
if !ok {
return 0, errors.New("the node not have")
}
u := &s.(*Server).users
for i := range p.UserInfo {
u.Store(p.UserInfo[i].Uuid, struct{}{})
}
return len(p.UserInfo), nil
}
func (h *Hy) GetUserTraffic(tag, uuid string, reset bool) (up int64, down int64) {
s, _ := h.servers.Load(tag)
c := &s.(*Server).counter
up = c.getCounters(uuid).UpCounter.Load()
down = c.getCounters(uuid).DownCounter.Load()
if reset {
c.Reset(uuid)
}
return
}
func (h *Hy) DelUsers(users []string, tag string) error {
v, e := h.servers.Load(tag)
if !e {
return errors.New("the node is not have")
}
s := v.(*Server)
for i := range users {
s.users.Delete(users[i])
s.counter.Delete(users[i])
}
return nil
}

View File

@@ -2,5 +2,5 @@
package imports
// not works for now
//import _ "github.com/Yuzuki616/V2bX/core/hy"
// not yet tested
import _ "github.com/Yuzuki616/V2bX/core/hy"

View File

@@ -17,6 +17,6 @@ type Core interface {
AddNode(tag string, info *panel.NodeInfo, config *conf.ControllerConfig) error
DelNode(tag string) error
AddUsers(p *AddUsersParams) (added int, err error)
GetUserTraffic(email string, reset bool) (up int64, down int64)
GetUserTraffic(tag, uuid string, reset bool) (up int64, down int64)
DelUsers(users []string, tag string) error
}

View File

@@ -20,7 +20,7 @@ func (c *Core) AddNode(tag string, info *panel.NodeInfo, config *conf.Controller
if err != nil {
return fmt.Errorf("add inbound error: %s", err)
}
outBoundConfig, err := builder.BuildOutbound(config, info, tag)
outBoundConfig, err := builder.BuildOutbound(config, tag)
if err != nil {
return fmt.Errorf("build outbound error: %s", err)
}

View File

@@ -40,9 +40,9 @@ func (c *Core) DelUsers(users []string, tag string) error {
return nil
}
func (c *Core) GetUserTraffic(email string, reset bool) (up int64, down int64) {
upName := "user>>>" + email + ">>>traffic>>>uplink"
downName := "user>>>" + email + ">>>traffic>>>downlink"
func (c *Core) GetUserTraffic(tag, uuid string, reset bool) (up int64, down int64) {
upName := "user>>>" + builder.BuildUserTag(tag, uuid) + ">>>traffic>>>uplink"
downName := "user>>>" + builder.BuildUserTag(tag, uuid) + ">>>traffic>>>downlink"
upCounter := c.shm.GetCounter(upName)
downCounter := c.shm.GetCounter(downName)
if reset {
@@ -65,10 +65,10 @@ func (c *Core) GetUserTraffic(email string, reset bool) (up int64, down int64) {
func (c *Core) AddUsers(p *vCore.AddUsersParams) (added int, err error) {
users := make([]*protocol.User, 0, len(p.UserInfo))
switch p.NodeInfo.NodeType {
switch p.NodeInfo.Type {
case "v2ray":
if p.Config.EnableVless {
users = builder.BuildVlessUsers(p.Tag, p.UserInfo, p.Config.EnableXtls)
if p.Config.XrayOptions.EnableXtls {
users = builder.BuildVlessUsers(p.Tag, p.UserInfo, true)
} else {
users = builder.BuildVmessUsers(p.Tag, p.UserInfo)
}
@@ -80,7 +80,7 @@ func (c *Core) AddUsers(p *vCore.AddUsersParams) (added int, err error) {
p.NodeInfo.Cipher,
p.NodeInfo.ServerKey)
default:
return 0, fmt.Errorf("unsupported node type: %s", p.NodeInfo.NodeType)
return 0, fmt.Errorf("unsupported node type: %s", p.NodeInfo.Type)
}
man, err := c.GetUserManager(p.Tag)
if err != nil {