From c7a0cfc66d98a036d07c851b7fbb5852d906394e Mon Sep 17 00:00:00 2001 From: fatedier Date: Tue, 30 May 2023 10:55:00 +0800 Subject: [PATCH] xtcp: when connection timeout occurs, support fallback to STCP (#3460) --- client/service.go | 6 ++-- client/visitor/stcp.go | 24 ++++++++++--- client/visitor/sudp.go | 1 + client/visitor/visitor.go | 20 +++++++++++ client/visitor/visitor_manager.go | 36 ++++++++++++------- client/visitor/xtcp.go | 60 ++++++++++++++++++++++++++----- cmd/frpc/sub/root.go | 6 ++-- conf/frpc_full.ini | 6 ++++ pkg/config/client_test.go | 7 ++-- pkg/config/visitor.go | 28 ++++++++++----- pkg/config/visitor_test.go | 7 ++-- pkg/nathole/analysis.go | 30 ++++++++-------- pkg/nathole/classify.go | 11 +++--- pkg/nathole/nathole.go | 2 +- test/e2e/basic/basic.go | 2 +- test/e2e/basic/xtcp.go | 52 +++++++++++++++++++++++++++ 16 files changed, 230 insertions(+), 68 deletions(-) create mode 100644 test/e2e/basic/xtcp.go diff --git a/client/service.go b/client/service.go index e6ff6d9..0849484 100644 --- a/client/service.go +++ b/client/service.go @@ -369,7 +369,8 @@ func (cm *ConnectionManager) OpenConnection() error { } tlsConfig.NextProtos = []string{"frp"} - conn, err := quic.DialAddr( + conn, err := quic.DialAddrContext( + cm.ctx, net.JoinHostPort(cm.cfg.ServerAddr, strconv.Itoa(cm.cfg.ServerPort)), tlsConfig, &quic.Config{ MaxIdleTimeout: time.Duration(cm.cfg.QUICMaxIdleTimeout) * time.Second, @@ -467,7 +468,8 @@ func (cm *ConnectionManager) realConnect() (net.Conn, error) { Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, cm.cfg.DisableCustomTLSFirstByte), }), ) - conn, err := libdial.Dial( + conn, err := libdial.DialContext( + cm.ctx, net.JoinHostPort(cm.cfg.ServerAddr, strconv.Itoa(cm.cfg.ServerPort)), dialOptions..., ) diff --git a/client/visitor/stcp.go b/client/visitor/stcp.go index 672aadf..7086c61 100644 --- a/client/visitor/stcp.go +++ b/client/visitor/stcp.go @@ -35,17 +35,20 @@ type STCPVisitor struct { } func (sv *STCPVisitor) Run() (err error) { - sv.l, err = net.Listen("tcp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort))) - if err != nil { - return + if sv.cfg.BindPort > 0 { + sv.l, err = net.Listen("tcp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort))) + if err != nil { + return + } + go sv.worker() } - go sv.worker() + go sv.internalConnWorker() return } func (sv *STCPVisitor) Close() { - sv.l.Close() + sv.BaseVisitor.Close() } func (sv *STCPVisitor) worker() { @@ -56,7 +59,18 @@ func (sv *STCPVisitor) worker() { xl.Warn("stcp local listener closed") return } + go sv.handleConn(conn) + } +} +func (sv *STCPVisitor) internalConnWorker() { + xl := xlog.FromContextSafe(sv.ctx) + for { + conn, err := sv.internalLn.Accept() + if err != nil { + xl.Warn("stcp internal listener closed") + return + } go sv.handleConn(conn) } } diff --git a/client/visitor/sudp.go b/client/visitor/sudp.go index e93d877..1e052c3 100644 --- a/client/visitor/sudp.go +++ b/client/visitor/sudp.go @@ -254,6 +254,7 @@ func (sv *SUDPVisitor) Close() { default: close(sv.checkCloseCh) } + sv.BaseVisitor.Close() if sv.udpConn != nil { sv.udpConn.Close() } diff --git a/client/visitor/visitor.go b/client/visitor/visitor.go index 1f6471d..10c0ab1 100644 --- a/client/visitor/visitor.go +++ b/client/visitor/visitor.go @@ -21,12 +21,14 @@ import ( "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/transport" + utilnet "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/pkg/util/xlog" ) // Visitor is used for forward traffics from local port tot remote service. type Visitor interface { Run() error + AcceptConn(conn net.Conn) error Close() } @@ -35,14 +37,17 @@ func NewVisitor( cfg config.VisitorConf, clientCfg config.ClientCommonConf, connectServer func() (net.Conn, error), + transferConn func(string, net.Conn) error, msgTransporter transport.MessageTransporter, ) (visitor Visitor) { xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseInfo().ProxyName) baseVisitor := BaseVisitor{ clientCfg: clientCfg, connectServer: connectServer, + transferConn: transferConn, msgTransporter: msgTransporter, ctx: xlog.NewContext(ctx, xl), + internalLn: utilnet.NewInternalListener(), } switch cfg := cfg.(type) { case *config.STCPVisitorConf: @@ -69,9 +74,24 @@ func NewVisitor( type BaseVisitor struct { clientCfg config.ClientCommonConf connectServer func() (net.Conn, error) + transferConn func(string, net.Conn) error msgTransporter transport.MessageTransporter l net.Listener + internalLn *utilnet.InternalListener mu sync.RWMutex ctx context.Context } + +func (v *BaseVisitor) AcceptConn(conn net.Conn) error { + return v.internalLn.PutConn(conn) +} + +func (v *BaseVisitor) Close() { + if v.l != nil { + v.l.Close() + } + if v.internalLn != nil { + v.internalLn.Close() + } +} diff --git a/client/visitor/visitor_manager.go b/client/visitor/visitor_manager.go index 02b7e49..799ee3a 100644 --- a/client/visitor/visitor_manager.go +++ b/client/visitor/visitor_manager.go @@ -16,6 +16,7 @@ package visitor import ( "context" + "fmt" "net" "sync" "time" @@ -34,7 +35,7 @@ type Manager struct { checkInterval time.Duration - mu sync.Mutex + mu sync.RWMutex ctx context.Context stopCh chan struct{} @@ -83,11 +84,24 @@ func (vm *Manager) Run() { } } +func (vm *Manager) Close() { + vm.mu.Lock() + defer vm.mu.Unlock() + for _, v := range vm.visitors { + v.Close() + } + select { + case <-vm.stopCh: + default: + close(vm.stopCh) + } +} + // Hold lock before calling this function. func (vm *Manager) startVisitor(cfg config.VisitorConf) (err error) { xl := xlog.FromContextSafe(vm.ctx) name := cfg.GetBaseInfo().ProxyName - visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.connectServer, vm.msgTransporter) + visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.connectServer, vm.TransferConn, vm.msgTransporter) err = visitor.Run() if err != nil { xl.Warn("start error: %v", err) @@ -139,15 +153,13 @@ func (vm *Manager) Reload(cfgs map[string]config.VisitorConf) { } } -func (vm *Manager) Close() { - vm.mu.Lock() - defer vm.mu.Unlock() - for _, v := range vm.visitors { - v.Close() - } - select { - case <-vm.stopCh: - default: - close(vm.stopCh) +// TransferConn transfers a connection to a visitor. +func (vm *Manager) TransferConn(name string, conn net.Conn) error { + vm.mu.RLock() + defer vm.mu.RUnlock() + v, ok := vm.visitors[name] + if !ok { + return fmt.Errorf("visitor [%s] not found", name) } + return v.AcceptConn(conn) } diff --git a/client/visitor/xtcp.go b/client/visitor/xtcp.go index 7cbb90e..5c4e2cf 100644 --- a/client/visitor/xtcp.go +++ b/client/visitor/xtcp.go @@ -59,12 +59,15 @@ func (sv *XTCPVisitor) Run() (err error) { sv.session = NewQUICTunnelSession(&sv.clientCfg) } - sv.l, err = net.Listen("tcp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort))) - if err != nil { - return + if sv.cfg.BindPort > 0 { + sv.l, err = net.Listen("tcp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort))) + if err != nil { + return + } + go sv.worker() } - go sv.worker() + go sv.internalConnWorker() go sv.processTunnelStartEvents() if sv.cfg.KeepTunnelOpen { sv.retryLimiter = rate.NewLimiter(rate.Every(time.Hour/time.Duration(sv.cfg.MaxRetriesAnHour)), sv.cfg.MaxRetriesAnHour) @@ -74,8 +77,12 @@ func (sv *XTCPVisitor) Run() (err error) { } func (sv *XTCPVisitor) Close() { - sv.l.Close() - sv.cancel() + sv.mu.Lock() + defer sv.mu.Unlock() + sv.BaseVisitor.Close() + if sv.cancel != nil { + sv.cancel() + } if sv.session != nil { sv.session.Close() } @@ -89,7 +96,18 @@ func (sv *XTCPVisitor) worker() { xl.Warn("xtcp local listener closed") return } + go sv.handleConn(conn) + } +} +func (sv *XTCPVisitor) internalConnWorker() { + xl := xlog.FromContextSafe(sv.ctx) + for { + conn, err := sv.internalLn.Accept() + if err != nil { + xl.Warn("xtcp internal listener closed") + return + } go sv.handleConn(conn) } } @@ -139,15 +157,37 @@ func (sv *XTCPVisitor) keepTunnelOpenWorker() { func (sv *XTCPVisitor) handleConn(userConn net.Conn) { xl := xlog.FromContextSafe(sv.ctx) - defer userConn.Close() + isConnTrasfered := false + defer func() { + if !isConnTrasfered { + userConn.Close() + } + }() xl.Debug("get a new xtcp user connection") // Open a tunnel connection to the server. If there is already a successful hole-punching connection, // it will be reused. Otherwise, it will block and wait for a successful hole-punching connection until timeout. - tunnelConn, err := sv.openTunnel() + ctx := context.Background() + if sv.cfg.FallbackTo != "" { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(sv.cfg.FallbackTimeoutMs)*time.Millisecond) + defer cancel() + ctx = timeoutCtx + } + tunnelConn, err := sv.openTunnel(ctx) if err != nil { xl.Error("open tunnel error: %v", err) + // no fallback, just return + if sv.cfg.FallbackTo == "" { + return + } + + xl.Debug("try to transfer connection to visitor: %s", sv.cfg.FallbackTo) + if err := sv.transferConn(sv.cfg.FallbackTo, userConn); err != nil { + xl.Error("transfer connection to visitor %s error: %v", sv.cfg.FallbackTo, err) + return + } + isConnTrasfered = true return } @@ -171,7 +211,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) { } // openTunnel will open a tunnel connection to the target server. -func (sv *XTCPVisitor) openTunnel() (conn net.Conn, err error) { +func (sv *XTCPVisitor) openTunnel(ctx context.Context) (conn net.Conn, err error) { xl := xlog.FromContextSafe(sv.ctx) ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() @@ -185,6 +225,8 @@ func (sv *XTCPVisitor) openTunnel() (conn net.Conn, err error) { select { case <-sv.ctx.Done(): return nil, sv.ctx.Err() + case <-ctx.Done(): + return nil, ctx.Err() case <-immediateTrigger: conn, err = sv.getTunnelConn() case <-ticker.C: diff --git a/cmd/frpc/sub/root.go b/cmd/frpc/sub/root.go index 8271d06..5779750 100644 --- a/cmd/frpc/sub/root.go +++ b/cmd/frpc/sub/root.go @@ -117,7 +117,6 @@ var rootCmd = &cobra.Command{ // Do not show command usage here. err := runClient(cfgFile) if err != nil { - fmt.Println(err) os.Exit(1) } return nil @@ -199,6 +198,7 @@ func parseClientCommonCfgFromCmd() (cfg config.ClientCommonConf, err error) { func runClient(cfgFilePath string) error { cfg, pxyCfgs, visitorCfgs, err := config.ParseClientConfig(cfgFilePath) if err != nil { + fmt.Println(err) return err } return startService(cfg, pxyCfgs, visitorCfgs, cfgFilePath) @@ -214,8 +214,8 @@ func startService( cfg.LogMaxDays, cfg.DisableLogColor) if cfgFile != "" { - log.Trace("start frpc service for config file [%s]", cfgFile) - defer log.Trace("frpc service for config file [%s] stopped", cfgFile) + log.Info("start frpc service for config file [%s]", cfgFile) + defer log.Info("frpc service for config file [%s] stopped", cfgFile) } svr, errRet := client.NewService(cfg, pxyCfgs, visitorCfgs, cfgFile) if errRet != nil { diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index e18d2a0..1c609f6 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -337,6 +337,8 @@ server_name = secret_tcp sk = abcdefg # connect this address to visitor stcp server bind_addr = 127.0.0.1 +# bind_port can be less than 0, it means don't bind to the port and only receive connections redirected from +# other visitors. (This is not supported for SUDP now) bind_port = 9000 use_encryption = false use_compression = false @@ -355,6 +357,8 @@ type = xtcp server_name = p2p_tcp sk = abcdefg bind_addr = 127.0.0.1 +# bind_port can be less than 0, it means don't bind to the port and only receive connections redirected from +# other visitors. (This is not supported for SUDP now) bind_port = 9001 use_encryption = false use_compression = false @@ -363,6 +367,8 @@ keep_tunnel_open = false # effective when keep_tunnel_open is set to true, the number of attempts to punch through per hour max_retries_an_hour = 8 min_retry_interval = 90 +# fallback_to = stcp_visitor +# fallback_timeout_ms = 500 [tcpmuxhttpconnect] type = tcpmux diff --git a/pkg/config/client_test.go b/pkg/config/client_test.go index bff47cd..e6ce3a1 100644 --- a/pkg/config/client_test.go +++ b/pkg/config/client_test.go @@ -661,9 +661,10 @@ func Test_LoadClientBasicConf(t *testing.T) { BindAddr: "127.0.0.1", BindPort: 9001, }, - Protocol: "quic", - MaxRetriesAnHour: 8, - MinRetryInterval: 90, + Protocol: "quic", + MaxRetriesAnHour: 8, + MinRetryInterval: 90, + FallbackTimeoutMs: 1000, }, } diff --git a/pkg/config/visitor.go b/pkg/config/visitor.go index 0ee010f..808240b 100644 --- a/pkg/config/visitor.go +++ b/pkg/config/visitor.go @@ -49,7 +49,10 @@ type BaseVisitorConf struct { Sk string `ini:"sk" json:"sk"` ServerName string `ini:"server_name" json:"server_name"` BindAddr string `ini:"bind_addr" json:"bind_addr"` - BindPort int `ini:"bind_port" json:"bind_port"` + // BindPort is the port that visitor listens on. + // It can be less than 0, it means don't bind to the port and only receive connections redirected from + // other visitors. (This is not supported for SUDP now) + BindPort int `ini:"bind_port" json:"bind_port"` } type SUDPVisitorConf struct { @@ -63,10 +66,12 @@ type STCPVisitorConf struct { type XTCPVisitorConf struct { BaseVisitorConf `ini:",extends"` - Protocol string `ini:"protocol" json:"protocol,omitempty"` - KeepTunnelOpen bool `ini:"keep_tunnel_open" json:"keep_tunnel_open,omitempty"` - MaxRetriesAnHour int `ini:"max_retries_an_hour" json:"max_retries_an_hour,omitempty"` - MinRetryInterval int `ini:"min_retry_interval" json:"min_retry_interval,omitempty"` + Protocol string `ini:"protocol" json:"protocol,omitempty"` + KeepTunnelOpen bool `ini:"keep_tunnel_open" json:"keep_tunnel_open,omitempty"` + MaxRetriesAnHour int `ini:"max_retries_an_hour" json:"max_retries_an_hour,omitempty"` + MinRetryInterval int `ini:"min_retry_interval" json:"min_retry_interval,omitempty"` + FallbackTo string `ini:"fallback_to" json:"fallback_to,omitempty"` + FallbackTimeoutMs int `ini:"fallback_timeout_ms" json:"fallback_timeout_ms,omitempty"` } // DefaultVisitorConf creates a empty VisitorConf object by visitorType. @@ -134,7 +139,9 @@ func (cfg *BaseVisitorConf) check() (err error) { err = fmt.Errorf("bind_addr shouldn't be empty") return } - if cfg.BindPort <= 0 { + // BindPort can be less than 0, it means don't bind to the port and only receive connections redirected from + // other visitors + if cfg.BindPort == 0 { err = fmt.Errorf("bind_port is required") return } @@ -155,7 +162,6 @@ func (cfg *BaseVisitorConf) unmarshalFromIni(prefix string, name string, section if cfg.BindAddr == "" { cfg.BindAddr = "127.0.0.1" } - return nil } @@ -169,7 +175,6 @@ func preVisitorUnmarshalFromIni(cfg VisitorConf, prefix string, name string, sec if err != nil { return err } - return nil } @@ -268,7 +273,9 @@ func (cfg *XTCPVisitorConf) Compare(cmp VisitorConf) bool { if cfg.Protocol != cmpConf.Protocol || cfg.KeepTunnelOpen != cmpConf.KeepTunnelOpen || cfg.MaxRetriesAnHour != cmpConf.MaxRetriesAnHour || - cfg.MinRetryInterval != cmpConf.MinRetryInterval { + cfg.MinRetryInterval != cmpConf.MinRetryInterval || + cfg.FallbackTo != cmpConf.FallbackTo || + cfg.FallbackTimeoutMs != cmpConf.FallbackTimeoutMs { return false } return true @@ -290,6 +297,9 @@ func (cfg *XTCPVisitorConf) UnmarshalFromIni(prefix string, name string, section if cfg.MinRetryInterval <= 0 { cfg.MinRetryInterval = 90 } + if cfg.FallbackTimeoutMs <= 0 { + cfg.FallbackTimeoutMs = 1000 + } return } diff --git a/pkg/config/visitor_test.go b/pkg/config/visitor_test.go index d91f90b..0f4171d 100644 --- a/pkg/config/visitor_test.go +++ b/pkg/config/visitor_test.go @@ -87,9 +87,10 @@ func Test_Visitor_UnmarshalFromIni(t *testing.T) { BindAddr: "127.0.0.1", BindPort: 9001, }, - Protocol: "quic", - MaxRetriesAnHour: 8, - MinRetryInterval: 90, + Protocol: "quic", + MaxRetriesAnHour: 8, + MinRetryInterval: 90, + FallbackTimeoutMs: 1000, }, }, } diff --git a/pkg/nathole/analysis.go b/pkg/nathole/analysis.go index 772fd8f..75f9a55 100644 --- a/pkg/nathole/analysis.go +++ b/pkg/nathole/analysis.go @@ -63,20 +63,20 @@ var ( } // mode 2, HardNAT is receiver, EasyNAT is sender - // sender, portsRandomNumber 1000, sendDelayMs 2000 | receiver, listen 256 ports, ttl 7 - // sender, portsRandomNumber 1000, sendDelayMs 2000 | receiver, listen 256 ports, ttl 4 - // sender, portsRandomNumber 1000, sendDelayMs 2000 | receiver, listen 256 ports + // sender, portsRandomNumber 1000, sendDelayMs 3000 | receiver, listen 256 ports, ttl 7 + // sender, portsRandomNumber 1000, sendDelayMs 3000 | receiver, listen 256 ports, ttl 4 + // sender, portsRandomNumber 1000, sendDelayMs 3000 | receiver, listen 256 ports mode2Behaviors = []lo.Tuple2[RecommandBehavior, RecommandBehavior]{ lo.T2( - RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000}, + RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000}, RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 7}, ), lo.T2( - RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000}, + RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000}, RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 4}, ), lo.T2( - RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000}, + RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000}, RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256}, ), } @@ -98,21 +98,21 @@ var ( } // mode 4, Regular ports changes are usually the sender. - // sender, portsRandomNumber 1000, sendDelayMs: 2000 | receiver, listen 256 ports, ttl 7, portsRangeNumber 10 - // sender, portsRandomNumber 1000, sendDelayMs: 2000 | receiver, listen 256 ports, ttl 4, portsRangeNumber 10 - // sender, portsRandomNumber 1000, SendDelayMs: 2000 | receiver, listen 256 ports, portsRangeNumber 10 + // sender, portsRandomNumber 1000, sendDelayMs: 2000 | receiver, listen 256 ports, ttl 7, portsRangeNumber 2 + // sender, portsRandomNumber 1000, sendDelayMs: 2000 | receiver, listen 256 ports, ttl 4, portsRangeNumber 2 + // sender, portsRandomNumber 1000, SendDelayMs: 2000 | receiver, listen 256 ports, portsRangeNumber 2 mode4Behaviors = []lo.Tuple2[RecommandBehavior, RecommandBehavior]{ lo.T2( - RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000}, - RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 7, PortsRangeNumber: 10}, + RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000}, + RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 7, PortsRangeNumber: 2}, ), lo.T2( - RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000}, - RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 4, PortsRangeNumber: 10}, + RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000}, + RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 4, PortsRangeNumber: 2}, ), lo.T2( - RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000}, - RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, PortsRangeNumber: 10}, + RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000}, + RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, PortsRangeNumber: 2}, ), } ) diff --git a/pkg/nathole/classify.go b/pkg/nathole/classify.go index 79f8efe..1ec5952 100644 --- a/pkg/nathole/classify.go +++ b/pkg/nathole/classify.go @@ -85,11 +85,6 @@ func ClassifyNATFeature(addresses []string, localIPs []string) (*NatFeature, err } } - natFeature.PortsDifference = portMax - portMin - if natFeature.PortsDifference <= 10 && natFeature.PortsDifference >= 1 { - natFeature.RegularPortsChange = true - } - switch { case ipChanged && portChanged: natFeature.NatType = HardNAT @@ -104,6 +99,12 @@ func ClassifyNATFeature(addresses []string, localIPs []string) (*NatFeature, err natFeature.NatType = EasyNAT natFeature.Behavior = BehaviorNoChange } + if natFeature.Behavior == BehaviorPortChanged { + natFeature.PortsDifference = portMax - portMin + if natFeature.PortsDifference <= 5 && natFeature.PortsDifference >= 1 { + natFeature.RegularPortsChange = true + } + } return natFeature, nil } diff --git a/pkg/nathole/nathole.go b/pkg/nathole/nathole.go index a4d5e46..c10eb30 100644 --- a/pkg/nathole/nathole.go +++ b/pkg/nathole/nathole.go @@ -384,7 +384,7 @@ func sendSidMessageToRangePorts( if err := sendFunc(conn, detectAddr); err != nil { xl.Trace("send sid message from %s to %s error: %v", conn.LocalAddr(), detectAddr, err) } - time.Sleep(5 * time.Millisecond) + time.Sleep(2 * time.Millisecond) } } } diff --git a/test/e2e/basic/basic.go b/test/e2e/basic/basic.go index 111e494..9f5914d 100644 --- a/test/e2e/basic/basic.go +++ b/test/e2e/basic/basic.go @@ -376,7 +376,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() { for _, test := range tests { framework.NewRequestExpect(f). RequestModify(func(r *request.Request) { - r.Timeout(5 * time.Second) + r.Timeout(10 * time.Second) }). Protocol(protocol). PortName(test.bindPortName). diff --git a/test/e2e/basic/xtcp.go b/test/e2e/basic/xtcp.go new file mode 100644 index 0000000..a501d79 --- /dev/null +++ b/test/e2e/basic/xtcp.go @@ -0,0 +1,52 @@ +package basic + +import ( + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + + "github.com/fatedier/frp/test/e2e/framework" + "github.com/fatedier/frp/test/e2e/framework/consts" + "github.com/fatedier/frp/test/e2e/pkg/port" + "github.com/fatedier/frp/test/e2e/pkg/request" +) + +var _ = ginkgo.Describe("[Feature: XTCP]", func() { + f := framework.NewDefaultFramework() + + ginkgo.It("Fallback To STCP", func() { + serverConf := consts.DefaultServerConfig + clientConf := consts.DefaultClientConfig + + bindPortName := port.GenName("XTCP") + clientConf += fmt.Sprintf(` + [foo] + type = stcp + local_port = {{ .%s }} + + [foo-visitor] + type = stcp + role = visitor + server_name = foo + bind_port = -1 + + [bar-visitor] + type = xtcp + role = visitor + server_name = bar + bind_port = {{ .%s }} + keep_tunnel_open = true + fallback_to = foo-visitor + fallback_timeout_ms = 200 + `, framework.TCPEchoServerPort, bindPortName) + + f.RunProcesses([]string{serverConf}, []string{clientConf}) + framework.NewRequestExpect(f). + RequestModify(func(r *request.Request) { + r.Timeout(time.Second) + }). + PortName(bindPortName). + Ensure() + }) +})