From e1cef053be195c4e5a80805a238af3cc12199c74 Mon Sep 17 00:00:00 2001 From: fatedier Date: Fri, 16 Jun 2023 00:14:19 +0800 Subject: [PATCH] server/proxy: simplify the code (#3488) --- pkg/nathole/controller.go | 1 + server/proxy/http.go | 21 ++++- server/proxy/https.go | 24 +++-- server/proxy/proxy.go | 192 ++++++++++++++++---------------------- server/proxy/stcp.go | 23 +++-- server/proxy/sudp.go | 23 +++-- server/proxy/tcp.go | 43 +++++---- server/proxy/tcpmux.go | 24 +++-- server/proxy/udp.go | 36 ++++--- server/proxy/xtcp.go | 21 ++++- 10 files changed, 235 insertions(+), 173 deletions(-) diff --git a/pkg/nathole/controller.go b/pkg/nathole/controller.go index 6f97455..82e2b20 100644 --- a/pkg/nathole/controller.go +++ b/pkg/nathole/controller.go @@ -130,6 +130,7 @@ func (c *Controller) ListenClient(name string, sk string, allowUsers []string) c } c.mu.Lock() defer c.mu.Unlock() + // TODO(fatedier): return error if name already exists c.clientCfgs[name] = cfg return cfg.sidCh } diff --git a/server/proxy/http.go b/server/proxy/http.go index 31742b7..432dbc1 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -17,10 +17,10 @@ package proxy import ( "io" "net" + "reflect" "strings" libio "github.com/fatedier/golib/io" - "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/util/limit" @@ -30,6 +30,10 @@ import ( "github.com/fatedier/frp/server/metrics" ) +func init() { + RegisterProxyFactory(reflect.TypeOf(&config.HTTPProxyConf{}), NewHTTPProxy) +} + type HTTPProxy struct { *BaseProxy cfg *config.HTTPProxyConf @@ -37,6 +41,17 @@ type HTTPProxy struct { closeFuncs []func() } +func NewHTTPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy { + unwrapped, ok := cfg.(*config.HTTPProxyConf) + if !ok { + return nil + } + return &HTTPProxy{ + BaseProxy: baseProxy, + cfg: unwrapped, + } +} + func (pxy *HTTPProxy) Run() (remoteAddr string, err error) { xl := pxy.xl routeConfig := vhost.RouteConfig{ @@ -137,10 +152,6 @@ func (pxy *HTTPProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *HTTPProxy) GetLimiter() *rate.Limiter { - return pxy.limiter -} - func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err error) { xl := pxy.xl rAddr, errRet := net.ResolveTCPAddr("tcp", remoteAddr) diff --git a/server/proxy/https.go b/server/proxy/https.go index 1fb579e..74f0e7c 100644 --- a/server/proxy/https.go +++ b/server/proxy/https.go @@ -15,20 +15,34 @@ package proxy import ( + "reflect" "strings" - "golang.org/x/time/rate" - "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/vhost" ) +func init() { + RegisterProxyFactory(reflect.TypeOf(&config.HTTPSProxyConf{}), NewHTTPSProxy) +} + type HTTPSProxy struct { *BaseProxy cfg *config.HTTPSProxyConf } +func NewHTTPSProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy { + unwrapped, ok := cfg.(*config.HTTPSProxyConf) + if !ok { + return nil + } + return &HTTPSProxy{ + BaseProxy: baseProxy, + cfg: unwrapped, + } +} + func (pxy *HTTPSProxy) Run() (remoteAddr string, err error) { xl := pxy.xl routeConfig := &vhost.RouteConfig{} @@ -67,7 +81,7 @@ func (pxy *HTTPSProxy) Run() (remoteAddr string, err error) { addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, pxy.serverCfg.VhostHTTPSPort)) } - pxy.startListenHandler(pxy, HandleUserTCPConnection) + pxy.startCommonTCPListenersHandler() remoteAddr = strings.Join(addrs, ",") return } @@ -76,10 +90,6 @@ func (pxy *HTTPSProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *HTTPSProxy) GetLimiter() *rate.Limiter { - return pxy.limiter -} - func (pxy *HTTPSProxy) Close() { pxy.BaseProxy.Close() } diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index b2d326a..9ccab5a 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "net" + "reflect" "strconv" "sync" "time" @@ -36,6 +37,12 @@ import ( "github.com/fatedier/frp/server/metrics" ) +var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, config.ProxyConf) Proxy{} + +func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, config.ProxyConf) Proxy) { + proxyFactoryRegistry[proxyConfType] = factory +} + type GetWorkConnFn func() (net.Conn, error) type Proxy interface { @@ -63,6 +70,7 @@ type BaseProxy struct { limiter *rate.Limiter userInfo plugin.UserInfo loginMsg *msg.Login + pxyConf config.ProxyConf mu sync.RWMutex xl *xlog.Logger @@ -93,6 +101,10 @@ func (pxy *BaseProxy) GetLoginMsg() *msg.Login { return pxy.loginMsg } +func (pxy *BaseProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *BaseProxy) Close() { xl := xlog.FromContextSafe(pxy.ctx) xl.Info("proxy closing") @@ -155,10 +167,8 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, return } -// startListenHandler start a goroutine handler for each listener. -// p: p will just be passed to handler(Proxy, utilnet.Conn). -// handler: each proxy type can set different handler function to deal with connections accepted from listeners. -func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, config.ServerCommonConf)) { +// startCommonTCPListenersHandler start a goroutine handler for each listener. +func (pxy *BaseProxy) startCommonTCPListenersHandler() { xl := xlog.FromContextSafe(pxy.ctx) for _, listener := range pxy.listeners { go func(l net.Listener) { @@ -187,12 +197,72 @@ func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, return } xl.Info("get a user connection [%s]", c.RemoteAddr().String()) - go handler(p, c, pxy.serverCfg) + go pxy.handleUserTCPConnection(c) } }(listener) } } +// HandleUserTCPConnection is used for incoming user TCP connections. +func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) { + xl := xlog.FromContextSafe(pxy.Context()) + defer userConn.Close() + + serverCfg := pxy.serverCfg + cfg := pxy.pxyConf.GetBaseConfig() + // server plugin hook + rc := pxy.GetResourceController() + content := &plugin.NewUserConnContent{ + User: pxy.GetUserInfo(), + ProxyName: pxy.GetName(), + ProxyType: cfg.ProxyType, + RemoteAddr: userConn.RemoteAddr().String(), + } + _, err := rc.PluginManager.NewUserConn(content) + if err != nil { + xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err) + return + } + + // try all connections from the pool + workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr()) + if err != nil { + return + } + defer workConn.Close() + + var local io.ReadWriteCloser = workConn + xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression) + if cfg.UseEncryption { + local, err = libio.WithEncryption(local, []byte(serverCfg.Token)) + if err != nil { + xl.Error("create encryption stream error: %v", err) + return + } + } + if cfg.UseCompression { + local = libio.WithCompression(local) + } + + if pxy.GetLimiter() != nil { + local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error { + return local.Close() + }) + } + + xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), + workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) + + name := pxy.GetName() + proxyType := cfg.ProxyType + metrics.Server.OpenConnection(name, proxyType) + inCount, outCount, _ := libio.Join(local, userConn) + metrics.Server.CloseConnection(name, proxyType) + metrics.Server.AddTrafficIn(name, proxyType, inCount) + metrics.Server.AddTrafficOut(name, proxyType, outCount) + xl.Debug("join connections closed") +} + func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int, getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, loginMsg *msg.Login, ) (pxy Proxy, err error) { @@ -216,114 +286,18 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso ctx: xlog.NewContext(ctx, xl), userInfo: userInfo, loginMsg: loginMsg, + pxyConf: pxyConf, } - switch cfg := pxyConf.(type) { - case *config.TCPProxyConf: - basePxy.usedPortsNum = 1 - pxy = &TCPProxy{ - BaseProxy: &basePxy, - cfg: cfg, - } - case *config.TCPMuxProxyConf: - pxy = &TCPMuxProxy{ - BaseProxy: &basePxy, - cfg: cfg, - } - case *config.HTTPProxyConf: - pxy = &HTTPProxy{ - BaseProxy: &basePxy, - cfg: cfg, - } - case *config.HTTPSProxyConf: - pxy = &HTTPSProxy{ - BaseProxy: &basePxy, - cfg: cfg, - } - case *config.UDPProxyConf: - basePxy.usedPortsNum = 1 - pxy = &UDPProxy{ - BaseProxy: &basePxy, - cfg: cfg, - } - case *config.STCPProxyConf: - pxy = &STCPProxy{ - BaseProxy: &basePxy, - cfg: cfg, - } - case *config.XTCPProxyConf: - pxy = &XTCPProxy{ - BaseProxy: &basePxy, - cfg: cfg, - } - case *config.SUDPProxyConf: - pxy = &SUDPProxy{ - BaseProxy: &basePxy, - cfg: cfg, - } - default: + + factory := proxyFactoryRegistry[reflect.TypeOf(pxyConf)] + if factory == nil { return pxy, fmt.Errorf("proxy type not support") } - return -} - -// HandleUserTCPConnection is used for incoming user TCP connections. -// It can be used for tcp, http, https type. -func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.ServerCommonConf) { - xl := xlog.FromContextSafe(pxy.Context()) - defer userConn.Close() - - // server plugin hook - rc := pxy.GetResourceController() - content := &plugin.NewUserConnContent{ - User: pxy.GetUserInfo(), - ProxyName: pxy.GetName(), - ProxyType: pxy.GetConf().GetBaseConfig().ProxyType, - RemoteAddr: userConn.RemoteAddr().String(), + pxy = factory(&basePxy, pxyConf) + if pxy == nil { + return nil, fmt.Errorf("proxy not created") } - _, err := rc.PluginManager.NewUserConn(content) - if err != nil { - xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err) - return - } - - // try all connections from the pool - workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr()) - if err != nil { - return - } - defer workConn.Close() - - var local io.ReadWriteCloser = workConn - cfg := pxy.GetConf().GetBaseConfig() - xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression) - if cfg.UseEncryption { - local, err = libio.WithEncryption(local, []byte(serverCfg.Token)) - if err != nil { - xl.Error("create encryption stream error: %v", err) - return - } - } - if cfg.UseCompression { - local = libio.WithCompression(local) - } - - if pxy.GetLimiter() != nil { - local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error { - return local.Close() - }) - } - - xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), - workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) - - name := pxy.GetName() - proxyType := pxy.GetConf().GetBaseConfig().ProxyType - metrics.Server.OpenConnection(name, proxyType) - inCount, outCount, _ := libio.Join(local, userConn) - metrics.Server.CloseConnection(name, proxyType) - metrics.Server.AddTrafficIn(name, proxyType, inCount) - metrics.Server.AddTrafficOut(name, proxyType, outCount) - xl.Debug("join connections closed") + return pxy, nil } type Manager struct { diff --git a/server/proxy/stcp.go b/server/proxy/stcp.go index f5311be..7c1511a 100644 --- a/server/proxy/stcp.go +++ b/server/proxy/stcp.go @@ -15,16 +15,31 @@ package proxy import ( - "golang.org/x/time/rate" + "reflect" "github.com/fatedier/frp/pkg/config" ) +func init() { + RegisterProxyFactory(reflect.TypeOf(&config.STCPProxyConf{}), NewSTCPProxy) +} + type STCPProxy struct { *BaseProxy cfg *config.STCPProxyConf } +func NewSTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy { + unwrapped, ok := cfg.(*config.STCPProxyConf) + if !ok { + return nil + } + return &STCPProxy{ + BaseProxy: baseProxy, + cfg: unwrapped, + } +} + func (pxy *STCPProxy) Run() (remoteAddr string, err error) { xl := pxy.xl allowUsers := pxy.cfg.AllowUsers @@ -40,7 +55,7 @@ func (pxy *STCPProxy) Run() (remoteAddr string, err error) { pxy.listeners = append(pxy.listeners, listener) xl.Info("stcp proxy custom listen success") - pxy.startListenHandler(pxy, HandleUserTCPConnection) + pxy.startCommonTCPListenersHandler() return } @@ -48,10 +63,6 @@ func (pxy *STCPProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *STCPProxy) GetLimiter() *rate.Limiter { - return pxy.limiter -} - func (pxy *STCPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.VisitorManager.CloseListener(pxy.GetName()) diff --git a/server/proxy/sudp.go b/server/proxy/sudp.go index 82bf8d2..492b095 100644 --- a/server/proxy/sudp.go +++ b/server/proxy/sudp.go @@ -15,16 +15,31 @@ package proxy import ( - "golang.org/x/time/rate" + "reflect" "github.com/fatedier/frp/pkg/config" ) +func init() { + RegisterProxyFactory(reflect.TypeOf(&config.SUDPProxyConf{}), NewSUDPProxy) +} + type SUDPProxy struct { *BaseProxy cfg *config.SUDPProxyConf } +func NewSUDPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy { + unwrapped, ok := cfg.(*config.SUDPProxyConf) + if !ok { + return nil + } + return &SUDPProxy{ + BaseProxy: baseProxy, + cfg: unwrapped, + } +} + func (pxy *SUDPProxy) Run() (remoteAddr string, err error) { xl := pxy.xl allowUsers := pxy.cfg.AllowUsers @@ -40,7 +55,7 @@ func (pxy *SUDPProxy) Run() (remoteAddr string, err error) { pxy.listeners = append(pxy.listeners, listener) xl.Info("sudp proxy custom listen success") - pxy.startListenHandler(pxy, HandleUserTCPConnection) + pxy.startCommonTCPListenersHandler() return } @@ -48,10 +63,6 @@ func (pxy *SUDPProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *SUDPProxy) GetLimiter() *rate.Limiter { - return pxy.limiter -} - func (pxy *SUDPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.VisitorManager.CloseListener(pxy.GetName()) diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 1ba0fb1..5bb5c1a 100644 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -17,24 +17,39 @@ package proxy import ( "fmt" "net" + "reflect" "strconv" - "golang.org/x/time/rate" - "github.com/fatedier/frp/pkg/config" ) +func init() { + RegisterProxyFactory(reflect.TypeOf(&config.TCPProxyConf{}), NewTCPProxy) +} + type TCPProxy struct { *BaseProxy cfg *config.TCPProxyConf - realPort int + realBindPort int +} + +func NewTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy { + unwrapped, ok := cfg.(*config.TCPProxyConf) + if !ok { + return nil + } + baseProxy.usedPortsNum = 1 + return &TCPProxy{ + BaseProxy: baseProxy, + cfg: unwrapped, + } } func (pxy *TCPProxy) Run() (remoteAddr string, err error) { xl := pxy.xl if pxy.cfg.Group != "" { - l, realPort, errRet := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, pxy.serverCfg.ProxyBindAddr, pxy.cfg.RemotePort) + l, realBindPort, errRet := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, pxy.serverCfg.ProxyBindAddr, pxy.cfg.RemotePort) if errRet != nil { err = errRet return @@ -44,20 +59,20 @@ func (pxy *TCPProxy) Run() (remoteAddr string, err error) { l.Close() } }() - pxy.realPort = realPort + pxy.realBindPort = realBindPort pxy.listeners = append(pxy.listeners, l) xl.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group) } else { - pxy.realPort, err = pxy.rc.TCPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) + pxy.realBindPort, err = pxy.rc.TCPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) if err != nil { return } defer func() { if err != nil { - pxy.rc.TCPPortManager.Release(pxy.realPort) + pxy.rc.TCPPortManager.Release(pxy.realBindPort) } }() - listener, errRet := net.Listen("tcp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realPort))) + listener, errRet := net.Listen("tcp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realBindPort))) if errRet != nil { err = errRet return @@ -66,9 +81,9 @@ func (pxy *TCPProxy) Run() (remoteAddr string, err error) { xl.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort) } - pxy.cfg.RemotePort = pxy.realPort - remoteAddr = fmt.Sprintf(":%d", pxy.realPort) - pxy.startListenHandler(pxy, HandleUserTCPConnection) + pxy.cfg.RemotePort = pxy.realBindPort + remoteAddr = fmt.Sprintf(":%d", pxy.realBindPort) + pxy.startCommonTCPListenersHandler() return } @@ -76,13 +91,9 @@ func (pxy *TCPProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *TCPProxy) GetLimiter() *rate.Limiter { - return pxy.limiter -} - func (pxy *TCPProxy) Close() { pxy.BaseProxy.Close() if pxy.cfg.Group == "" { - pxy.rc.TCPPortManager.Release(pxy.realPort) + pxy.rc.TCPPortManager.Release(pxy.realBindPort) } } diff --git a/server/proxy/tcpmux.go b/server/proxy/tcpmux.go index 23e833b..6d25a84 100644 --- a/server/proxy/tcpmux.go +++ b/server/proxy/tcpmux.go @@ -17,21 +17,35 @@ package proxy import ( "fmt" "net" + "reflect" "strings" - "golang.org/x/time/rate" - "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/consts" "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/vhost" ) +func init() { + RegisterProxyFactory(reflect.TypeOf(&config.TCPMuxProxyConf{}), NewTCPMuxProxy) +} + type TCPMuxProxy struct { *BaseProxy cfg *config.TCPMuxProxyConf } +func NewTCPMuxProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy { + unwrapped, ok := cfg.(*config.TCPMuxProxyConf) + if !ok { + return nil + } + return &TCPMuxProxy{ + BaseProxy: baseProxy, + cfg: unwrapped, + } +} + func (pxy *TCPMuxProxy) httpConnectListen( domain, routeByHTTPUser, httpUser, httpPwd string, addrs []string) ([]string, error, ) { @@ -78,7 +92,7 @@ func (pxy *TCPMuxProxy) httpConnectRun() (remoteAddr string, err error) { } } - pxy.startListenHandler(pxy, HandleUserTCPConnection) + pxy.startCommonTCPListenersHandler() remoteAddr = strings.Join(addrs, ",") return remoteAddr, err } @@ -101,10 +115,6 @@ func (pxy *TCPMuxProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *TCPMuxProxy) GetLimiter() *rate.Limiter { - return pxy.limiter -} - func (pxy *TCPMuxProxy) Close() { pxy.BaseProxy.Close() } diff --git a/server/proxy/udp.go b/server/proxy/udp.go index e047c9d..20108bf 100644 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -19,12 +19,12 @@ import ( "fmt" "io" "net" + "reflect" "strconv" "time" "github.com/fatedier/golib/errors" libio "github.com/fatedier/golib/io" - "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/msg" @@ -34,11 +34,15 @@ import ( "github.com/fatedier/frp/server/metrics" ) +func init() { + RegisterProxyFactory(reflect.TypeOf(&config.UDPProxyConf{}), NewUDPProxy) +} + type UDPProxy struct { *BaseProxy cfg *config.UDPProxyConf - realPort int + realBindPort int // udpConn is the listener of udp packages udpConn *net.UDPConn @@ -59,21 +63,33 @@ type UDPProxy struct { isClosed bool } +func NewUDPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy { + unwrapped, ok := cfg.(*config.UDPProxyConf) + if !ok { + return nil + } + baseProxy.usedPortsNum = 1 + return &UDPProxy{ + BaseProxy: baseProxy, + cfg: unwrapped, + } +} + func (pxy *UDPProxy) Run() (remoteAddr string, err error) { xl := pxy.xl - pxy.realPort, err = pxy.rc.UDPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) + pxy.realBindPort, err = pxy.rc.UDPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) if err != nil { return "", fmt.Errorf("acquire port %d error: %v", pxy.cfg.RemotePort, err) } defer func() { if err != nil { - pxy.rc.UDPPortManager.Release(pxy.realPort) + pxy.rc.UDPPortManager.Release(pxy.realBindPort) } }() - remoteAddr = fmt.Sprintf(":%d", pxy.realPort) - pxy.cfg.RemotePort = pxy.realPort - addr, errRet := net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realPort))) + remoteAddr = fmt.Sprintf(":%d", pxy.realBindPort) + pxy.cfg.RemotePort = pxy.realBindPort + addr, errRet := net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realBindPort))) if errRet != nil { err = errRet return @@ -233,10 +249,6 @@ func (pxy *UDPProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *UDPProxy) GetLimiter() *rate.Limiter { - return pxy.limiter -} - func (pxy *UDPProxy) Close() { pxy.mu.Lock() defer pxy.mu.Unlock() @@ -254,5 +266,5 @@ func (pxy *UDPProxy) Close() { close(pxy.readCh) close(pxy.sendCh) } - pxy.rc.UDPPortManager.Release(pxy.realPort) + pxy.rc.UDPPortManager.Release(pxy.realBindPort) } diff --git a/server/proxy/xtcp.go b/server/proxy/xtcp.go index 9f4b9f4..5d6871e 100644 --- a/server/proxy/xtcp.go +++ b/server/proxy/xtcp.go @@ -16,14 +16,18 @@ package proxy import ( "fmt" + "reflect" "github.com/fatedier/golib/errors" - "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/msg" ) +func init() { + RegisterProxyFactory(reflect.TypeOf(&config.XTCPProxyConf{}), NewXTCPProxy) +} + type XTCPProxy struct { *BaseProxy cfg *config.XTCPProxyConf @@ -31,6 +35,17 @@ type XTCPProxy struct { closeCh chan struct{} } +func NewXTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy { + unwrapped, ok := cfg.(*config.XTCPProxyConf) + if !ok { + return nil + } + return &XTCPProxy{ + BaseProxy: baseProxy, + cfg: unwrapped, + } +} + func (pxy *XTCPProxy) Run() (remoteAddr string, err error) { xl := pxy.xl @@ -72,10 +87,6 @@ func (pxy *XTCPProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *XTCPProxy) GetLimiter() *rate.Limiter { - return pxy.limiter -} - func (pxy *XTCPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.NatHoleController.CloseClient(pxy.GetName())