diff --git a/client/admin_api.go b/client/admin_api.go index af61b39..1acbb95 100644 --- a/client/admin_api.go +++ b/client/admin_api.go @@ -88,6 +88,7 @@ type StatusResp struct { Https []ProxyStatusResp `json:"https"` Stcp []ProxyStatusResp `json:"stcp"` Xtcp []ProxyStatusResp `json:"xtcp"` + Sudp []ProxyStatusResp `json:"sudp"` } type ProxyStatusResp struct { @@ -155,6 +156,11 @@ func NewProxyStatusResp(status *proxy.ProxyStatus, serverAddr string) ProxyStatu psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) } psr.Plugin = cfg.Plugin + case *config.SudpProxyConf: + if cfg.LocalPort != 0 { + psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) + } + psr.Plugin = cfg.Plugin } return psr } @@ -171,6 +177,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) { res.Https = make([]ProxyStatusResp, 0) res.Stcp = make([]ProxyStatusResp, 0) res.Xtcp = make([]ProxyStatusResp, 0) + res.Sudp = make([]ProxyStatusResp, 0) log.Info("Http request [/api/status]") defer func() { @@ -194,6 +201,8 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) { res.Stcp = append(res.Stcp, NewProxyStatusResp(status, svr.cfg.ServerAddr)) case "xtcp": res.Xtcp = append(res.Xtcp, NewProxyStatusResp(status, svr.cfg.ServerAddr)) + case "sudp": + res.Sudp = append(res.Sudp, NewProxyStatusResp(status, svr.cfg.ServerAddr)) } } sort.Sort(ByProxyStatusResp(res.Tcp)) @@ -202,6 +211,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) { sort.Sort(ByProxyStatusResp(res.Https)) sort.Sort(ByProxyStatusResp(res.Stcp)) sort.Sort(ByProxyStatusResp(res.Xtcp)) + sort.Sort(ByProxyStatusResp(res.Sudp)) return } diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index c263e1d..0c9ea52 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -102,6 +102,12 @@ func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.Cl BaseProxy: &baseProxy, cfg: cfg, } + case *config.SudpProxyConf: + pxy = &SudpProxy{ + BaseProxy: &baseProxy, + cfg: cfg, + closeCh: make(chan struct{}), + } } return } @@ -540,6 +546,151 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh) } +type SudpProxy struct { + *BaseProxy + + cfg *config.SudpProxyConf + + localAddr *net.UDPAddr + + closeCh chan struct{} +} + +func (pxy *SudpProxy) Run() (err error) { + pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort)) + if err != nil { + return + } + return +} + +func (pxy *SudpProxy) Close() { + pxy.mu.Lock() + defer pxy.mu.Unlock() + select { + case <-pxy.closeCh: + return + default: + close(pxy.closeCh) + } +} + +func (pxy *SudpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { + xl := pxy.xl + xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String()) + + if pxy.limiter != nil { + rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { + return conn.Close() + }) + conn = frpNet.WrapReadWriteCloserToConn(rwc, conn) + } + + workConn := conn + readCh := make(chan *msg.UdpPacket, 1024) + sendCh := make(chan msg.Message, 1024) + isClose := false + + mu := &sync.Mutex{} + + closeFn := func() { + mu.Lock() + defer mu.Unlock() + if isClose { + return + } + + isClose = true + if workConn != nil { + workConn.Close() + } + close(readCh) + close(sendCh) + } + + // udp service <- frpc <- frps <- frpc visitor <- user + workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) { + defer closeFn() + + for { + // first to check sudp proxy is closed or not + select { + case <-pxy.closeCh: + xl.Trace("frpc sudp proxy is closed") + return + default: + } + + var udpMsg msg.UdpPacket + if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { + xl.Warn("read from workConn for sudp error: %v", errRet) + return + } + + if errRet := errors.PanicToError(func() { + readCh <- &udpMsg + }); errRet != nil { + xl.Warn("reader goroutine for sudp work connection closed: %v", errRet) + return + } + } + } + + // udp service -> frpc -> frps -> frpc visitor -> user + workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) { + defer func() { + closeFn() + xl.Info("writer goroutine for sudp work connection closed") + }() + + var errRet error + for rawMsg := range sendCh { + switch m := rawMsg.(type) { + case *msg.UdpPacket: + xl.Trace("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]", + m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String()) + case *msg.Ping: + xl.Trace("frpc send ping message to frpc visitor") + } + + if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil { + xl.Error("sudp work write error: %v", errRet) + return + } + } + } + + heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) { + ticker := time.NewTicker(30 * time.Second) + defer func() { + ticker.Stop() + closeFn() + }() + + var errRet error + for { + select { + case <-ticker.C: + if errRet = errors.PanicToError(func() { + sendCh <- &msg.Ping{} + }); errRet != nil { + xl.Warn("heartbeat goroutine for sudp work connection closed") + return + } + case <-pxy.closeCh: + xl.Trace("frpc sudp proxy is closed") + return + } + } + } + + go workConnSenderFn(workConn, sendCh) + go workConnReaderFn(workConn, readCh) + go heartbeatFn(workConn, sendCh) + + udp.Forwarder(pxy.localAddr, readCh, sendCh) +} + // Common handler for tcp work connections. func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) { diff --git a/client/visitor.go b/client/visitor.go index a4900e0..d300486 100644 --- a/client/visitor.go +++ b/client/visitor.go @@ -26,10 +26,12 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/models/proto/udp" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/xlog" + "github.com/fatedier/golib/errors" frpIo "github.com/fatedier/golib/io" "github.com/fatedier/golib/pool" fmux "github.com/hashicorp/yamux" @@ -58,6 +60,12 @@ func NewVisitor(ctx context.Context, ctl *Control, cfg config.VisitorConf) (visi BaseVisitor: &baseVisitor, cfg: cfg, } + case *config.SudpVisitorConf: + visitor = &SudpVisitor{ + BaseVisitor: &baseVisitor, + cfg: cfg, + checkCloseCh: make(chan struct{}), + } } return } @@ -328,3 +336,204 @@ func (sv *XtcpVisitor) handleConn(userConn net.Conn) { frpIo.Join(userConn, muxConnRWCloser) xl.Debug("join connections closed") } + +type SudpVisitor struct { + *BaseVisitor + + checkCloseCh chan struct{} + // udpConn is the listener of udp packet + udpConn *net.UDPConn + readCh chan *msg.UdpPacket + sendCh chan *msg.UdpPacket + + cfg *config.SudpVisitorConf +} + +// SUDP Run start listen a udp port +func (sv *SudpVisitor) Run() (err error) { + xl := xlog.FromContextSafe(sv.ctx) + + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort)) + if err != nil { + return fmt.Errorf("sudp ResolveUDPAddr error: %v", err) + } + + sv.udpConn, err = net.ListenUDP("udp", addr) + if err != nil { + return fmt.Errorf("listen udp port %s error: %v", addr.String(), err) + } + + sv.sendCh = make(chan *msg.UdpPacket, 1024) + sv.readCh = make(chan *msg.UdpPacket, 1024) + + xl.Info("sudp start to work") + + go sv.dispatcher() + go udp.ForwardUserConn(sv.udpConn, sv.readCh, sv.sendCh) + + return +} + +func (sv *SudpVisitor) dispatcher() { + xl := xlog.FromContextSafe(sv.ctx) + + for { + // loop for get frpc to frps tcp conn + // setup worker + // wait worker to finished + // retry or exit + visitorConn, err := sv.getNewVisitorConn() + if err != nil { + // check if proxy is closed + // if checkCloseCh is close, we will return, other case we will continue to reconnect + select { + case <-sv.checkCloseCh: + xl.Info("frpc sudp visitor proxy is closed") + return + default: + } + + time.Sleep(3 * time.Second) + + xl.Warn("newVisitorConn to frps error: %v, try to reconnect", err) + continue + } + + sv.worker(visitorConn) + + select { + case <-sv.checkCloseCh: + return + default: + } + } +} + +func (sv *SudpVisitor) worker(workConn net.Conn) { + xl := xlog.FromContextSafe(sv.ctx) + xl.Debug("starting sudp proxy worker") + + wg := &sync.WaitGroup{} + wg.Add(2) + closeCh := make(chan struct{}) + + // udp service -> frpc -> frps -> frpc visitor -> user + workConnReaderFn := func(conn net.Conn) { + defer func() { + conn.Close() + close(closeCh) + wg.Done() + }() + + for { + var ( + rawMsg msg.Message + errRet error + ) + + // frpc will send heartbeat in workConn to frpc visitor for keeping alive + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil { + xl.Warn("read from workconn for user udp conn error: %v", errRet) + return + } + + conn.SetReadDeadline(time.Time{}) + switch m := rawMsg.(type) { + case *msg.Ping: + xl.Debug("frpc visitor get ping message from frpc") + continue + case *msg.UdpPacket: + if errRet := errors.PanicToError(func() { + sv.readCh <- m + xl.Trace("frpc visitor get udp packet from frpc") + }); errRet != nil { + xl.Info("reader goroutine for udp work connection closed") + return + } + } + } + } + + // udp service <- frpc <- frps <- frpc visitor <- user + workConnSenderFn := func(conn net.Conn) { + defer func() { + conn.Close() + wg.Done() + }() + + var errRet error + for { + select { + case udpMsg, ok := <-sv.sendCh: + if !ok { + xl.Info("sender goroutine for udp work connection closed") + return + } + + if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { + xl.Warn("sender goroutine for udp work connection closed: %v", errRet) + return + } + case <-closeCh: + return + } + } + } + + go workConnReaderFn(workConn) + go workConnSenderFn(workConn) + + wg.Wait() + xl.Info("sudp worker is closed") +} + +func (sv *SudpVisitor) getNewVisitorConn() (visitorConn net.Conn, err error) { + visitorConn, err = sv.ctl.connectServer() + if err != nil { + return nil, fmt.Errorf("frpc connect frps error: %v", err) + } + + now := time.Now().Unix() + newVisitorConnMsg := &msg.NewVisitorConn{ + ProxyName: sv.cfg.ServerName, + SignKey: util.GetAuthKey(sv.cfg.Sk, now), + Timestamp: now, + UseEncryption: sv.cfg.UseEncryption, + UseCompression: sv.cfg.UseCompression, + } + err = msg.WriteMsg(visitorConn, newVisitorConnMsg) + if err != nil { + return nil, fmt.Errorf("frpc send newVisitorConnMsg to frps error: %v", err) + } + + var newVisitorConnRespMsg msg.NewVisitorConnResp + visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second)) + err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg) + if err != nil { + return nil, fmt.Errorf("frpc read newVisitorConnRespMsg error: %v", err) + } + visitorConn.SetReadDeadline(time.Time{}) + + if newVisitorConnRespMsg.Error != "" { + return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error) + } + return +} + +func (sv *SudpVisitor) Close() { + sv.mu.Lock() + defer sv.mu.Unlock() + + select { + case <-sv.checkCloseCh: + return + default: + close(sv.checkCloseCh) + } + if sv.udpConn != nil { + sv.udpConn.Close() + } + close(sv.readCh) + close(sv.sendCh) +} diff --git a/cmd/frpc/sub/sudp.go b/cmd/frpc/sub/sudp.go new file mode 100644 index 0000000..e3e91ab --- /dev/null +++ b/cmd/frpc/sub/sudp.go @@ -0,0 +1,113 @@ +// Copyright 2018 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sub + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/consts" +) + +func init() { + sudpCmd.PersistentFlags().StringVarP(&serverAddr, "server_addr", "s", "127.0.0.1:7000", "frp server's address") + sudpCmd.PersistentFlags().StringVarP(&user, "user", "u", "", "user") + sudpCmd.PersistentFlags().StringVarP(&protocol, "protocol", "p", "tcp", "tcp or kcp or websocket") + sudpCmd.PersistentFlags().StringVarP(&token, "token", "t", "", "auth token") + sudpCmd.PersistentFlags().StringVarP(&logLevel, "log_level", "", "info", "log level") + sudpCmd.PersistentFlags().StringVarP(&logFile, "log_file", "", "console", "console or file path") + sudpCmd.PersistentFlags().IntVarP(&logMaxDays, "log_max_days", "", 3, "log file reversed days") + sudpCmd.PersistentFlags().BoolVarP(&disableLogColor, "disable_log_color", "", false, "disable log color in console") + + sudpCmd.PersistentFlags().StringVarP(&proxyName, "proxy_name", "n", "", "proxy name") + sudpCmd.PersistentFlags().StringVarP(&role, "role", "", "server", "role") + sudpCmd.PersistentFlags().StringVarP(&sk, "sk", "", "", "secret key") + sudpCmd.PersistentFlags().StringVarP(&serverName, "server_name", "", "", "server name") + sudpCmd.PersistentFlags().StringVarP(&localIp, "local_ip", "i", "127.0.0.1", "local ip") + sudpCmd.PersistentFlags().IntVarP(&localPort, "local_port", "l", 0, "local port") + sudpCmd.PersistentFlags().StringVarP(&bindAddr, "bind_addr", "", "", "bind addr") + sudpCmd.PersistentFlags().IntVarP(&bindPort, "bind_port", "", 0, "bind port") + sudpCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") + sudpCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + + rootCmd.AddCommand(sudpCmd) +} + +var sudpCmd = &cobra.Command{ + Use: "sudp", + Short: "Run frpc with a single sudp proxy", + RunE: func(cmd *cobra.Command, args []string) error { + clientCfg, err := parseClientCommonCfg(CfgFileTypeCmd, "") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + proxyConfs := make(map[string]config.ProxyConf) + visitorConfs := make(map[string]config.VisitorConf) + + var prefix string + if user != "" { + prefix = user + "." + } + + if role == "server" { + cfg := &config.SudpProxyConf{} + cfg.ProxyName = prefix + proxyName + cfg.ProxyType = consts.SudpProxy + cfg.UseEncryption = useEncryption + cfg.UseCompression = useCompression + cfg.Role = role + cfg.Sk = sk + cfg.LocalIp = localIp + cfg.LocalPort = localPort + err = cfg.CheckForCli() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + proxyConfs[cfg.ProxyName] = cfg + } else if role == "visitor" { + cfg := &config.SudpVisitorConf{} + cfg.ProxyName = prefix + proxyName + cfg.ProxyType = consts.SudpProxy + cfg.UseEncryption = useEncryption + cfg.UseCompression = useCompression + cfg.Role = role + cfg.Sk = sk + cfg.ServerName = serverName + cfg.BindAddr = bindAddr + cfg.BindPort = bindPort + err = cfg.Check() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + visitorConfs[cfg.ProxyName] = cfg + } else { + fmt.Println("invalid role") + os.Exit(1) + } + + err = startService(clientCfg, proxyConfs, visitorConfs, "") + if err != nil { + os.Exit(1) + } + return nil + }, +} diff --git a/doc/server_plugin.md b/doc/server_plugin.md index 599aa6e..0ba75d7 100644 --- a/doc/server_plugin.md +++ b/doc/server_plugin.md @@ -70,7 +70,7 @@ The response can look like any of the following: ### Operation -Currently `Login`, `NewProxy`, `Ping` and `NewWorkConn` operations are supported. +Currently `Login`, `NewProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported. #### Login @@ -172,6 +172,25 @@ New work connection received from frpc (RPC sent after `run_id` is matched with } ``` +#### NewUserConn + +New user connection received from proxy (support `tcp`, `stcp`, `https` and `tcpmux`) . + +``` +{ + "content": { + "user": { + "user": , + "metas": mapstring + "run_id": + }, + "proxy_name": , + "proxy_type": , + "remote_addr": + } +} +``` + ### Server Plugin Configuration ```ini diff --git a/doc/server_plugin_zh.md b/doc/server_plugin_zh.md index 78b71a3..353330b 100644 --- a/doc/server_plugin_zh.md +++ b/doc/server_plugin_zh.md @@ -69,7 +69,7 @@ Response ### 操作类型 -目前插件支持管理的操作类型有 `Login` 和 `NewProxy`。 +目前插件支持管理的操作类型有 `Login`、`NewProxy`、`Ping`、`NewWorkConn` 和 `NewUserConn`。 #### Login @@ -127,6 +127,63 @@ Response } ``` +#### Ping + +心跳相关信息 + +``` +{ + "content": { + "user": { + "user": , + "metas": mapstring + "run_id": + }, + "timestamp": , + "privilege_key": + } +} +``` + +#### NewWorkConn + +新增 `frpc` 连接相关信息 + +``` +{ + "content": { + "user": { + "user": , + "metas": mapstring + "run_id": + }, + "run_id": + "timestamp": , + "privilege_key": + } +} +``` + +#### NewUserConn + +新增 `proxy` 连接相关信息 (支持 `tcp`、`stcp`、`https` 和 `tcpmux` 协议)。 + +``` +{ + "content": { + "user": { + "user": , + "metas": mapstring + "run_id": + }, + "proxy_name": , + "proxy_type": , + "remote_addr": + } +} +``` + + ### frps 中插件配置 ```ini diff --git a/models/auth/token.go b/models/auth/token.go index f7be085..4c680d7 100644 --- a/models/auth/token.go +++ b/models/auth/token.go @@ -81,7 +81,7 @@ func (auth *TokenAuthSetterVerifier) SetPing(pingMsg *msg.Ping) error { } func (auth *TokenAuthSetterVerifier) SetNewWorkConn(newWorkConnMsg *msg.NewWorkConn) error { - if !auth.AuthenticateHeartBeats { + if !auth.AuthenticateNewWorkConns { return nil } diff --git a/models/config/proxy.go b/models/config/proxy.go index 591011f..a1e34ce 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -40,6 +40,7 @@ func init() { proxyConfTypeMap[consts.HttpsProxy] = reflect.TypeOf(HttpsProxyConf{}) proxyConfTypeMap[consts.StcpProxy] = reflect.TypeOf(StcpProxyConf{}) proxyConfTypeMap[consts.XtcpProxy] = reflect.TypeOf(XtcpProxyConf{}) + proxyConfTypeMap[consts.SudpProxy] = reflect.TypeOf(SudpProxyConf{}) } // NewConfByType creates a empty ProxyConf object by proxyType. @@ -875,6 +876,72 @@ func (cfg *HttpsProxyConf) CheckForSvr(serverCfg ServerCommonConf) (err error) { return } +// SUDP +type SudpProxyConf struct { + BaseProxyConf + + Role string `json:"role"` + Sk string `json:"sk"` +} + +func (cfg *SudpProxyConf) Compare(cmp ProxyConf) bool { + cmpConf, ok := cmp.(*SudpProxyConf) + if !ok { + return false + } + + if !cfg.BaseProxyConf.compare(&cmpConf.BaseProxyConf) || + cfg.Role != cmpConf.Role || + cfg.Sk != cmpConf.Sk { + return false + } + return true +} + +func (cfg *SudpProxyConf) UnmarshalFromIni(prefix string, name string, section ini.Section) (err error) { + if err = cfg.BaseProxyConf.UnmarshalFromIni(prefix, name, section); err != nil { + return + } + + cfg.Role = section["role"] + if cfg.Role != "server" { + return fmt.Errorf("Parse conf error: proxy [%s] incorrect role [%s]", name, cfg.Role) + } + + cfg.Sk = section["sk"] + + if err = cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil { + return + } + return +} + +func (cfg *SudpProxyConf) MarshalToMsg(pMsg *msg.NewProxy) { + cfg.BaseProxyConf.MarshalToMsg(pMsg) + pMsg.Sk = cfg.Sk +} + +func (cfg *SudpProxyConf) CheckForCli() (err error) { + if err = cfg.BaseProxyConf.checkForCli(); err != nil { + return + } + if cfg.Role != "server" { + err = fmt.Errorf("role should be 'server'") + return + } + return +} + +func (cfg *SudpProxyConf) CheckForSvr(serverCfg ServerCommonConf) (err error) { + return +} + +// Only for role server. +func (cfg *SudpProxyConf) UnmarshalFromMsg(pMsg *msg.NewProxy) { + cfg.BaseProxyConf.UnmarshalFromMsg(pMsg) + cfg.Sk = pMsg.Sk +} + // STCP type StcpProxyConf struct { BaseProxyConf diff --git a/models/config/visitor.go b/models/config/visitor.go index 4233375..ad9ff84 100644 --- a/models/config/visitor.go +++ b/models/config/visitor.go @@ -32,6 +32,7 @@ func init() { visitorConfTypeMap = make(map[string]reflect.Type) visitorConfTypeMap[consts.StcpProxy] = reflect.TypeOf(StcpVisitorConf{}) visitorConfTypeMap[consts.XtcpProxy] = reflect.TypeOf(XtcpVisitorConf{}) + visitorConfTypeMap[consts.SudpProxy] = reflect.TypeOf(SudpVisitorConf{}) } type VisitorConf interface { @@ -152,6 +153,36 @@ func (cfg *BaseVisitorConf) UnmarshalFromIni(prefix string, name string, section return nil } +type SudpVisitorConf struct { + BaseVisitorConf +} + +func (cfg *SudpVisitorConf) Compare(cmp VisitorConf) bool { + cmpConf, ok := cmp.(*SudpVisitorConf) + if !ok { + return false + } + + if !cfg.BaseVisitorConf.compare(&cmpConf.BaseVisitorConf) { + return false + } + return true +} + +func (cfg *SudpVisitorConf) UnmarshalFromIni(prefix string, name string, section ini.Section) (err error) { + if err = cfg.BaseVisitorConf.UnmarshalFromIni(prefix, name, section); err != nil { + return + } + return +} + +func (cfg *SudpVisitorConf) Check() (err error) { + if err = cfg.BaseVisitorConf.check(); err != nil { + return + } + return +} + type StcpVisitorConf struct { BaseVisitorConf } diff --git a/models/consts/consts.go b/models/consts/consts.go index 4c1ca4c..e63da54 100644 --- a/models/consts/consts.go +++ b/models/consts/consts.go @@ -30,6 +30,7 @@ var ( HttpsProxy string = "https" StcpProxy string = "stcp" XtcpProxy string = "xtcp" + SudpProxy string = "sudp" // authentication method TokenAuthMethod string = "token" diff --git a/models/plugin/server/manager.go b/models/plugin/server/manager.go index c427f0a..559dae9 100644 --- a/models/plugin/server/manager.go +++ b/models/plugin/server/manager.go @@ -28,6 +28,7 @@ type Manager struct { newProxyPlugins []Plugin pingPlugins []Plugin newWorkConnPlugins []Plugin + newUserConnPlugins []Plugin } func NewManager() *Manager { @@ -36,6 +37,7 @@ func NewManager() *Manager { newProxyPlugins: make([]Plugin, 0), pingPlugins: make([]Plugin, 0), newWorkConnPlugins: make([]Plugin, 0), + newUserConnPlugins: make([]Plugin, 0), } } @@ -52,6 +54,9 @@ func (m *Manager) Register(p Plugin) { if p.IsSupport(OpNewWorkConn) { m.pingPlugins = append(m.pingPlugins, p) } + if p.IsSupport(OpNewUserConn) { + m.newUserConnPlugins = append(m.newUserConnPlugins, p) + } } func (m *Manager) Login(content *LoginContent) (*LoginContent, error) { @@ -189,3 +194,37 @@ func (m *Manager) NewWorkConn(content *NewWorkConnContent) (*NewWorkConnContent, } return content, nil } + +func (m *Manager) NewUserConn(content *NewUserConnContent) (*NewUserConnContent, error) { + if len(m.newUserConnPlugins) == 0 { + return content, nil + } + + var ( + res = &Response{ + Reject: false, + Unchange: true, + } + retContent interface{} + err error + ) + reqid, _ := util.RandId() + xl := xlog.New().AppendPrefix("reqid: " + reqid) + ctx := xlog.NewContext(context.Background(), xl) + ctx = NewReqidContext(ctx, reqid) + + for _, p := range m.newUserConnPlugins { + res, retContent, err = p.Handle(ctx, OpNewUserConn, *content) + if err != nil { + xl.Info("send NewUserConn request to plugin [%s] error: %v", p.Name(), err) + return nil, errors.New("send NewUserConn request to plugin error") + } + if res.Reject { + return nil, fmt.Errorf("%s", res.RejectReason) + } + if !res.Unchange { + content = retContent.(*NewUserConnContent) + } + } + return content, nil +} diff --git a/models/plugin/server/plugin.go b/models/plugin/server/plugin.go index a89a16b..160d12a 100644 --- a/models/plugin/server/plugin.go +++ b/models/plugin/server/plugin.go @@ -25,6 +25,7 @@ const ( OpNewProxy = "NewProxy" OpPing = "Ping" OpNewWorkConn = "NewWorkConn" + OpNewUserConn = "NewUserConn" ) type Plugin interface { diff --git a/models/plugin/server/types.go b/models/plugin/server/types.go index 017236d..912a351 100644 --- a/models/plugin/server/types.go +++ b/models/plugin/server/types.go @@ -55,3 +55,10 @@ type NewWorkConnContent struct { User UserInfo `json:"user"` msg.NewWorkConn } + +type NewUserConnContent struct { + User UserInfo `json:"user"` + ProxyName string `json:"proxy_name"` + ProxyType string `json:"proxy_type"` + RemoteAddr string `json:"remote_addr"` +} diff --git a/models/proto/udp/udp.go b/models/proto/udp/udp.go index ed7f95a..8ae1db6 100644 --- a/models/proto/udp/udp.go +++ b/models/proto/udp/udp.go @@ -57,11 +57,11 @@ func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UdpPacket, sendCh for { n, remoteAddr, err := udpConn.ReadFromUDP(buf) if err != nil { - udpConn.Close() return } // buf[:n] will be encoded to string, so the bytes can be reused udpMsg := NewUdpPacket(buf[:n], nil, remoteAddr) + select { case sendCh <- udpMsg: default: diff --git a/package.sh b/package.sh index e7556db..2f9a0f2 100755 --- a/package.sh +++ b/package.sh @@ -1,5 +1,5 @@ # compile for version -#make +make if [ $? -ne 0 ]; then echo "make error" exit 1 @@ -9,7 +9,7 @@ frp_version=`./bin/frps --version` echo "build version: $frp_version" # cross_compiles -#make -f ./Makefile.cross-compiles +make -f ./Makefile.cross-compiles rm -rf ./release/packages mkdir -p ./release/packages diff --git a/server/control.go b/server/control.go index 4d7529e..017ac39 100644 --- a/server/control.go +++ b/server/control.go @@ -486,9 +486,16 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err return } + // User info + userInfo := plugin.UserInfo{ + User: ctl.loginMsg.User, + Metas: ctl.loginMsg.Metas, + RunId: ctl.runId, + } + // NewProxy will return a interface Proxy. // In fact it create different proxies by different proxy type, we just call run() here. - pxy, err := proxy.NewProxy(ctl.ctx, ctl.runId, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg) + pxy, err := proxy.NewProxy(ctl.ctx, userInfo, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg) if err != nil { return remoteAddr, err } diff --git a/server/controller/resource.go b/server/controller/resource.go index 5098dfb..b0c6ea2 100644 --- a/server/controller/resource.go +++ b/server/controller/resource.go @@ -16,6 +16,7 @@ package controller import ( "github.com/fatedier/frp/models/nathole" + plugin "github.com/fatedier/frp/models/plugin/server" "github.com/fatedier/frp/server/group" "github.com/fatedier/frp/server/ports" "github.com/fatedier/frp/utils/tcpmux" @@ -33,6 +34,9 @@ type ResourceController struct { // HTTP Group Controller HTTPGroupCtl *group.HTTPGroupController + // TCP Mux Group Controller + TcpMuxGroupCtl *group.TcpMuxGroupCtl + // Manage all tcp ports TcpPortManager *ports.PortManager @@ -50,4 +54,7 @@ type ResourceController struct { // TcpMux HTTP CONNECT multiplexer TcpMuxHttpConnectMuxer *tcpmux.HttpConnectTcpMuxer + + // All server manager plugin + PluginManager *plugin.Manager } diff --git a/server/group/tcpmux.go b/server/group/tcpmux.go new file mode 100644 index 0000000..4cae5b8 --- /dev/null +++ b/server/group/tcpmux.go @@ -0,0 +1,218 @@ +// Copyright 2020 guylewin, guy@lewin.co.il +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package group + +import ( + "context" + "fmt" + "net" + "sync" + + "github.com/fatedier/frp/models/consts" + "github.com/fatedier/frp/utils/tcpmux" + "github.com/fatedier/frp/utils/vhost" + + gerr "github.com/fatedier/golib/errors" +) + +// TcpMuxGroupCtl manage all TcpMuxGroups +type TcpMuxGroupCtl struct { + groups map[string]*TcpMuxGroup + + // portManager is used to manage port + tcpMuxHttpConnectMuxer *tcpmux.HttpConnectTcpMuxer + mu sync.Mutex +} + +// NewTcpMuxGroupCtl return a new TcpMuxGroupCtl +func NewTcpMuxGroupCtl(tcpMuxHttpConnectMuxer *tcpmux.HttpConnectTcpMuxer) *TcpMuxGroupCtl { + return &TcpMuxGroupCtl{ + groups: make(map[string]*TcpMuxGroup), + tcpMuxHttpConnectMuxer: tcpMuxHttpConnectMuxer, + } +} + +// Listen is the wrapper for TcpMuxGroup's Listen +// If there are no group, we will create one here +func (tmgc *TcpMuxGroupCtl) Listen(multiplexer string, group string, groupKey string, + domain string, ctx context.Context) (l net.Listener, err error) { + tmgc.mu.Lock() + tcpMuxGroup, ok := tmgc.groups[group] + if !ok { + tcpMuxGroup = NewTcpMuxGroup(tmgc) + tmgc.groups[group] = tcpMuxGroup + } + tmgc.mu.Unlock() + + switch multiplexer { + case consts.HttpConnectTcpMultiplexer: + return tcpMuxGroup.HttpConnectListen(group, groupKey, domain, ctx) + default: + err = fmt.Errorf("unknown multiplexer [%s]", multiplexer) + return + } +} + +// RemoveGroup remove TcpMuxGroup from controller +func (tmgc *TcpMuxGroupCtl) RemoveGroup(group string) { + tmgc.mu.Lock() + defer tmgc.mu.Unlock() + delete(tmgc.groups, group) +} + +// TcpMuxGroup route connections to different proxies +type TcpMuxGroup struct { + group string + groupKey string + domain string + + acceptCh chan net.Conn + index uint64 + tcpMuxLn net.Listener + lns []*TcpMuxGroupListener + ctl *TcpMuxGroupCtl + mu sync.Mutex +} + +// NewTcpMuxGroup return a new TcpMuxGroup +func NewTcpMuxGroup(ctl *TcpMuxGroupCtl) *TcpMuxGroup { + return &TcpMuxGroup{ + lns: make([]*TcpMuxGroupListener, 0), + ctl: ctl, + acceptCh: make(chan net.Conn), + } +} + +// Listen will return a new TcpMuxGroupListener +// if TcpMuxGroup already has a listener, just add a new TcpMuxGroupListener to the queues +// otherwise, listen on the real address +func (tmg *TcpMuxGroup) HttpConnectListen(group string, groupKey string, domain string, context context.Context) (ln *TcpMuxGroupListener, err error) { + tmg.mu.Lock() + defer tmg.mu.Unlock() + if len(tmg.lns) == 0 { + // the first listener, listen on the real address + routeConfig := &vhost.VhostRouteConfig{ + Domain: domain, + } + tcpMuxLn, errRet := tmg.ctl.tcpMuxHttpConnectMuxer.Listen(context, routeConfig) + if errRet != nil { + return nil, errRet + } + ln = newTcpMuxGroupListener(group, tmg, tcpMuxLn.Addr()) + + tmg.group = group + tmg.groupKey = groupKey + tmg.domain = domain + tmg.tcpMuxLn = tcpMuxLn + tmg.lns = append(tmg.lns, ln) + if tmg.acceptCh == nil { + tmg.acceptCh = make(chan net.Conn) + } + go tmg.worker() + } else { + // domain in the same group must be equal + if tmg.group != group || tmg.domain != domain { + return nil, ErrGroupParamsInvalid + } + if tmg.groupKey != groupKey { + return nil, ErrGroupAuthFailed + } + ln = newTcpMuxGroupListener(group, tmg, tmg.lns[0].Addr()) + tmg.lns = append(tmg.lns, ln) + } + return +} + +// worker is called when the real tcp listener has been created +func (tmg *TcpMuxGroup) worker() { + for { + c, err := tmg.tcpMuxLn.Accept() + if err != nil { + return + } + err = gerr.PanicToError(func() { + tmg.acceptCh <- c + }) + if err != nil { + return + } + } +} + +func (tmg *TcpMuxGroup) Accept() <-chan net.Conn { + return tmg.acceptCh +} + +// CloseListener remove the TcpMuxGroupListener from the TcpMuxGroup +func (tmg *TcpMuxGroup) CloseListener(ln *TcpMuxGroupListener) { + tmg.mu.Lock() + defer tmg.mu.Unlock() + for i, tmpLn := range tmg.lns { + if tmpLn == ln { + tmg.lns = append(tmg.lns[:i], tmg.lns[i+1:]...) + break + } + } + if len(tmg.lns) == 0 { + close(tmg.acceptCh) + tmg.tcpMuxLn.Close() + tmg.ctl.RemoveGroup(tmg.group) + } +} + +// TcpMuxGroupListener +type TcpMuxGroupListener struct { + groupName string + group *TcpMuxGroup + + addr net.Addr + closeCh chan struct{} +} + +func newTcpMuxGroupListener(name string, group *TcpMuxGroup, addr net.Addr) *TcpMuxGroupListener { + return &TcpMuxGroupListener{ + groupName: name, + group: group, + addr: addr, + closeCh: make(chan struct{}), + } +} + +// Accept will accept connections from TcpMuxGroup +func (ln *TcpMuxGroupListener) Accept() (c net.Conn, err error) { + var ok bool + select { + case <-ln.closeCh: + return nil, ErrListenerClosed + case c, ok = <-ln.group.Accept(): + if !ok { + return nil, ErrListenerClosed + } + return c, nil + } +} + +func (ln *TcpMuxGroupListener) Addr() net.Addr { + return ln.addr +} + +// Close close the listener +func (ln *TcpMuxGroupListener) Close() (err error) { + close(ln.closeCh) + + // remove self from TcpMuxGroup + ln.group.CloseListener(ln) + return +} diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 7f86212..d3668a1 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -24,6 +24,7 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" + plugin "github.com/fatedier/frp/models/plugin/server" "github.com/fatedier/frp/server/controller" "github.com/fatedier/frp/server/metrics" frpNet "github.com/fatedier/frp/utils/net" @@ -41,6 +42,8 @@ type Proxy interface { GetConf() config.ProxyConf GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) GetUsedPortsNum() int + GetResourceController() *controller.ResourceController + GetUserInfo() plugin.UserInfo Close() } @@ -52,6 +55,7 @@ type BaseProxy struct { poolCount int getWorkConnFn GetWorkConnFn serverCfg config.ServerCommonConf + userInfo plugin.UserInfo mu sync.RWMutex xl *xlog.Logger @@ -70,6 +74,14 @@ func (pxy *BaseProxy) GetUsedPortsNum() int { return pxy.usedPortsNum } +func (pxy *BaseProxy) GetResourceController() *controller.ResourceController { + return pxy.rc +} + +func (pxy *BaseProxy) GetUserInfo() plugin.UserInfo { + return pxy.userInfo +} + func (pxy *BaseProxy) Close() { xl := xlog.FromContextSafe(pxy.ctx) xl.Info("proxy closing") @@ -154,7 +166,7 @@ func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, } } -func NewProxy(ctx context.Context, runId string, rc *controller.ResourceController, poolCount int, +func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int, getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) { xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName) @@ -167,6 +179,7 @@ func NewProxy(ctx context.Context, runId string, rc *controller.ResourceControll serverCfg: serverCfg, xl: xl, ctx: xlog.NewContext(ctx, xl), + userInfo: userInfo, } switch cfg := pxyConf.(type) { case *config.TcpProxyConf: @@ -206,6 +219,11 @@ func NewProxy(ctx context.Context, runId string, rc *controller.ResourceControll BaseProxy: &basePxy, cfg: cfg, } + case *config.SudpProxyConf: + pxy = &SudpProxy{ + BaseProxy: &basePxy, + cfg: cfg, + } default: return pxy, fmt.Errorf("proxy type not support") } @@ -218,6 +236,20 @@ func HandleUserTcpConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv xl := xlog.FromContextSafe(pxy.Context()) defer userConn.Close() + // server plugin hook + rc := pxy.GetResourceController() + content := &plugin.NewUserConnContent{ + User: pxy.GetUserInfo(), + ProxyName: pxy.GetName(), + ProxyType: pxy.GetConf().GetBaseInfo().ProxyType, + RemoteAddr: userConn.RemoteAddr().String(), + } + _, err := rc.PluginManager.NewUserConn(content) + if err != nil { + xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err) + return + } + // try all connections from the pool workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr()) if err != nil { diff --git a/server/proxy/sudp.go b/server/proxy/sudp.go new file mode 100644 index 0000000..2916334 --- /dev/null +++ b/server/proxy/sudp.go @@ -0,0 +1,48 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "github.com/fatedier/frp/models/config" +) + +type SudpProxy struct { + *BaseProxy + cfg *config.SudpProxyConf +} + +func (pxy *SudpProxy) Run() (remoteAddr string, err error) { + xl := pxy.xl + + listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk) + if errRet != nil { + err = errRet + return + } + pxy.listeners = append(pxy.listeners, listener) + xl.Info("sudp proxy custom listen success") + + pxy.startListenHandler(pxy, HandleUserTcpConnection) + return +} + +func (pxy *SudpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *SudpProxy) Close() { + pxy.BaseProxy.Close() + pxy.rc.VisitorManager.CloseListener(pxy.GetName()) +} diff --git a/server/proxy/tcpmux.go b/server/proxy/tcpmux.go index 108e68d..6af61c8 100644 --- a/server/proxy/tcpmux.go +++ b/server/proxy/tcpmux.go @@ -16,6 +16,7 @@ package proxy import ( "fmt" + "net" "strings" "github.com/fatedier/frp/models/config" @@ -27,21 +28,24 @@ import ( type TcpMuxProxy struct { *BaseProxy cfg *config.TcpMuxProxyConf - - realPort int } -func (pxy *TcpMuxProxy) httpConnectListen(domain string, addrs []string) ([]string, error) { - routeConfig := &vhost.VhostRouteConfig{ - Domain: domain, +func (pxy *TcpMuxProxy) httpConnectListen(domain string, addrs []string) (_ []string, err error) { + var l net.Listener + if pxy.cfg.Group != "" { + l, err = pxy.rc.TcpMuxGroupCtl.Listen(pxy.cfg.Multiplexer, pxy.cfg.Group, pxy.cfg.GroupKey, domain, pxy.ctx) + } else { + routeConfig := &vhost.VhostRouteConfig{ + Domain: domain, + } + l, err = pxy.rc.TcpMuxHttpConnectMuxer.Listen(pxy.ctx, routeConfig) } - l, err := pxy.rc.TcpMuxHttpConnectMuxer.Listen(pxy.ctx, routeConfig) if err != nil { return nil, err } - pxy.xl.Info("tcpmux httpconnect multiplexer listens for host [%s]", routeConfig.Domain) + pxy.xl.Info("tcpmux httpconnect multiplexer listens for host [%s]", domain) pxy.listeners = append(pxy.listeners, l) - return append(addrs, util.CanonicalAddr(routeConfig.Domain, pxy.serverCfg.TcpMuxHttpConnectPort)), nil + return append(addrs, util.CanonicalAddr(domain, pxy.serverCfg.TcpMuxHttpConnectPort)), nil } func (pxy *TcpMuxProxy) httpConnectRun() (remoteAddr string, err error) { @@ -89,7 +93,4 @@ func (pxy *TcpMuxProxy) GetConf() config.ProxyConf { func (pxy *TcpMuxProxy) Close() { pxy.BaseProxy.Close() - if pxy.cfg.Group == "" { - pxy.rc.TcpPortManager.Release(pxy.realPort) - } } diff --git a/server/service.go b/server/service.go index d3c3169..4f1c702 100644 --- a/server/service.go +++ b/server/service.go @@ -114,11 +114,29 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) { cfg: cfg, } + // Create tcpmux httpconnect multiplexer. + if cfg.TcpMuxHttpConnectPort > 0 { + var l net.Listener + l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.TcpMuxHttpConnectPort)) + if err != nil { + err = fmt.Errorf("Create server listener error, %v", err) + return + } + + svr.rc.TcpMuxHttpConnectMuxer, err = tcpmux.NewHttpConnectTcpMuxer(l, vhostReadWriteTimeout) + if err != nil { + err = fmt.Errorf("Create vhost tcpMuxer error, %v", err) + return + } + log.Info("tcpmux httpconnect multiplexer listen on %s:%d", cfg.ProxyBindAddr, cfg.TcpMuxHttpConnectPort) + } + // Init all plugins for name, options := range cfg.HTTPPlugins { svr.pluginManager.Register(plugin.NewHTTPPluginOptions(options)) log.Info("plugin [%s] has been registered", name) } + svr.rc.PluginManager = svr.pluginManager // Init group controller svr.rc.TcpGroupCtl = group.NewTcpGroupCtl(svr.rc.TcpPortManager) @@ -126,6 +144,9 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) { // Init HTTP group controller svr.rc.HTTPGroupCtl = group.NewHTTPGroupController(svr.httpVhostRouter) + // Init TCP mux group controller + svr.rc.TcpMuxGroupCtl = group.NewTcpMuxGroupCtl(svr.rc.TcpMuxHttpConnectMuxer) + // Init 404 not found page vhost.NotFoundPagePath = cfg.Custom404Page @@ -220,23 +241,6 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) { log.Info("https service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort) } - // Create tcpmux httpconnect multiplexer. - if cfg.TcpMuxHttpConnectPort > 0 { - var l net.Listener - l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.TcpMuxHttpConnectPort)) - if err != nil { - err = fmt.Errorf("Create server listener error, %v", err) - return - } - - svr.rc.TcpMuxHttpConnectMuxer, err = tcpmux.NewHttpConnectTcpMuxer(l, vhostReadWriteTimeout) - if err != nil { - err = fmt.Errorf("Create vhost tcpMuxer error, %v", err) - return - } - log.Info("tcpmux httpconnect multiplexer listen on %s:%d", cfg.ProxyBindAddr, cfg.TcpMuxHttpConnectPort) - } - // frp tls listener svr.tlsListener = svr.muxer.Listen(1, 1, func(data []byte) bool { return int(data[0]) == frpNet.FRP_TLS_HEAD_BYTE @@ -296,6 +300,68 @@ func (svr *Service) Run() { svr.HandleListener(svr.listener) } +func (svr *Service) handleConnection(ctx context.Context, conn net.Conn) { + xl := xlog.FromContextSafe(ctx) + + var ( + rawMsg msg.Message + err error + ) + + conn.SetReadDeadline(time.Now().Add(connReadTimeout)) + if rawMsg, err = msg.ReadMsg(conn); err != nil { + log.Trace("Failed to read message: %v", err) + conn.Close() + return + } + conn.SetReadDeadline(time.Time{}) + + switch m := rawMsg.(type) { + case *msg.Login: + // server plugin hook + content := &plugin.LoginContent{ + Login: *m, + } + retContent, err := svr.pluginManager.Login(content) + if err == nil { + m = &retContent.Login + err = svr.RegisterControl(conn, m) + } + + // If login failed, send error message there. + // Otherwise send success message in control's work goroutine. + if err != nil { + xl.Warn("register control error: %v", err) + msg.WriteMsg(conn, &msg.LoginResp{ + Version: version.Full(), + Error: util.GenerateResponseErrorString("register control error", err, svr.cfg.DetailedErrorsToClient), + }) + conn.Close() + } + case *msg.NewWorkConn: + if err := svr.RegisterWorkConn(conn, m); err != nil { + conn.Close() + } + case *msg.NewVisitorConn: + if err = svr.RegisterVisitorConn(conn, m); err != nil { + xl.Warn("register visitor conn error: %v", err) + msg.WriteMsg(conn, &msg.NewVisitorConnResp{ + ProxyName: m.ProxyName, + Error: util.GenerateResponseErrorString("register visitor conn error", err, svr.cfg.DetailedErrorsToClient), + }) + conn.Close() + } else { + msg.WriteMsg(conn, &msg.NewVisitorConnResp{ + ProxyName: m.ProxyName, + Error: "", + }) + } + default: + log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String()) + conn.Close() + } +} + func (svr *Service) HandleListener(l net.Listener) { // Listen for incoming connections from client. for { @@ -306,7 +372,9 @@ func (svr *Service) HandleListener(l net.Listener) { } // inject xlog object into net.Conn context xl := xlog.New() - c = frpNet.NewContextConn(c, xlog.NewContext(context.Background(), xl)) + ctx := context.Background() + + c = frpNet.NewContextConn(c, xlog.NewContext(ctx, xl)) log.Trace("start check TLS connection...") originConn := c @@ -319,63 +387,7 @@ func (svr *Service) HandleListener(l net.Listener) { log.Trace("success check TLS connection") // Start a new goroutine for dealing connections. - go func(frpConn net.Conn) { - dealFn := func(conn net.Conn) { - var rawMsg msg.Message - conn.SetReadDeadline(time.Now().Add(connReadTimeout)) - if rawMsg, err = msg.ReadMsg(conn); err != nil { - log.Trace("Failed to read message: %v", err) - conn.Close() - return - } - conn.SetReadDeadline(time.Time{}) - - switch m := rawMsg.(type) { - case *msg.Login: - // server plugin hook - content := &plugin.LoginContent{ - Login: *m, - } - retContent, err := svr.pluginManager.Login(content) - if err == nil { - m = &retContent.Login - err = svr.RegisterControl(conn, m) - } - - // If login failed, send error message there. - // Otherwise send success message in control's work goroutine. - if err != nil { - xl.Warn("register control error: %v", err) - msg.WriteMsg(conn, &msg.LoginResp{ - Version: version.Full(), - Error: util.GenerateResponseErrorString("register control error", err, svr.cfg.DetailedErrorsToClient), - }) - conn.Close() - } - case *msg.NewWorkConn: - if err := svr.RegisterWorkConn(conn, m); err != nil { - conn.Close() - } - case *msg.NewVisitorConn: - if err = svr.RegisterVisitorConn(conn, m); err != nil { - xl.Warn("register visitor conn error: %v", err) - msg.WriteMsg(conn, &msg.NewVisitorConnResp{ - ProxyName: m.ProxyName, - Error: util.GenerateResponseErrorString("register visitor conn error", err, svr.cfg.DetailedErrorsToClient), - }) - conn.Close() - } else { - msg.WriteMsg(conn, &msg.NewVisitorConnResp{ - ProxyName: m.ProxyName, - Error: "", - }) - } - default: - log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String()) - conn.Close() - } - } - + go func(ctx context.Context, frpConn net.Conn) { if svr.cfg.TcpMux { fmuxCfg := fmux.DefaultConfig() fmuxCfg.KeepAliveInterval = 20 * time.Second @@ -394,12 +406,12 @@ func (svr *Service) HandleListener(l net.Listener) { session.Close() return } - go dealFn(stream) + go svr.handleConnection(ctx, stream) } } else { - dealFn(frpConn) + svr.handleConnection(ctx, frpConn) } - }(c) + }(ctx, c) } } diff --git a/tests/ci/auto_test_frpc.ini b/tests/ci/auto_test_frpc.ini index fbcf971..46f1279 100644 --- a/tests/ci/auto_test_frpc.ini +++ b/tests/ci/auto_test_frpc.ini @@ -72,6 +72,12 @@ local_port = 10701 use_encryption = true use_compression = true +[sudp] +type = sudp +sk = abcdefg +local_ip = 127.0.0.1 +local_port = 10702 + [web01] type = http local_ip = 127.0.0.1 diff --git a/tests/ci/auto_test_frpc_visitor.ini b/tests/ci/auto_test_frpc_visitor.ini index 660c793..2ed3af5 100644 --- a/tests/ci/auto_test_frpc_visitor.ini +++ b/tests/ci/auto_test_frpc_visitor.ini @@ -23,3 +23,12 @@ bind_addr = 127.0.0.1 bind_port = 10905 use_encryption = true use_compression = true + + +[sudp_visitor] +type = sudp +role = visitor +server_name = sudp +sk = abcdefg +bind_addr = 127.0.0.1 +bind_port = 10816 diff --git a/tests/ci/normal_test.go b/tests/ci/normal_test.go index 572fec0..6f1a8ad 100644 --- a/tests/ci/normal_test.go +++ b/tests/ci/normal_test.go @@ -118,6 +118,16 @@ func TestStcp(t *testing.T) { } } +func TestSudp(t *testing.T) { + assert := assert.New(t) + // Normal + addr := fmt.Sprintf("127.0.0.1:%d", consts.TEST_SUDP_FRP_PORT) + res, err := util.SendUdpMsg(addr, consts.TEST_SUDP_ECHO_STR) + + assert.NoError(err) + assert.Equal(consts.TEST_SUDP_ECHO_STR, res) +} + func TestHttp(t *testing.T) { assert := assert.New(t) // web01 diff --git a/tests/consts/consts.go b/tests/consts/consts.go index 5fc8857..e7f970e 100644 --- a/tests/consts/consts.go +++ b/tests/consts/consts.go @@ -46,6 +46,9 @@ var ( TEST_STCP_EC_FRP_PORT int = 10905 TEST_STCP_ECHO_STR string = "stcp type:" + TEST_STR + TEST_SUDP_FRP_PORT int = 10816 + TEST_SUDP_ECHO_STR string = "sudp type:" + TEST_STR + ProxyTcpPortNotAllowed string = "tcp_port_not_allowed" ProxyTcpPortUnavailable string = "tcp_port_unavailable" ProxyTcpPortNormal string = "tcp_port_normal" diff --git a/tests/util/util.go b/tests/util/util.go index 163ddc2..3dfd28f 100644 --- a/tests/util/util.go +++ b/tests/util/util.go @@ -71,6 +71,11 @@ func GetProxyStatus(statusAddr string, user string, passwd string, name string) return &s, nil } } + for _, s := range allStatus.Sudp { + if s.Name == name { + return &s, nil + } + } return status, errors.New("no proxy status found") } diff --git a/utils/version/version.go b/utils/version/version.go index 32779e5..ec13f8c 100644 --- a/utils/version/version.go +++ b/utils/version/version.go @@ -19,7 +19,7 @@ import ( "strings" ) -var version string = "0.32.1" +var version string = "0.33.0" func Full() string { return version