From b600a07ec0f7a33a3cdf1fd3ead2cd98d306a970 Mon Sep 17 00:00:00 2001 From: fatedier Date: Wed, 17 May 2017 17:47:20 +0800 Subject: [PATCH] support tcp stream multiplexing by smux --- client/control.go | 46 ++++++++++++++-- conf/frpc.ini | 23 +++++--- conf/frps.ini | 5 +- models/config/client_common.go | 9 ++++ models/config/server_common.go | 11 +++- server/service.go | 97 +++++++++++++++++++++------------- utils/net/conn.go | 12 +++++ 7 files changed, 153 insertions(+), 50 deletions(-) diff --git a/client/control.go b/client/control.go index 928bb64..9e33079 100644 --- a/client/control.go +++ b/client/control.go @@ -28,6 +28,7 @@ import ( "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/version" + "github.com/xtaci/smux" ) const ( @@ -50,6 +51,9 @@ type Control struct { // control connection conn net.Conn + // tcp stream multiplexing, if enabled + session *smux.Session + // put a message in this channel to send it over control connection to server sendCh chan (msg.Message) @@ -122,11 +126,25 @@ func (ctl *Control) Run() error { } func (ctl *Control) NewWorkConn() { - workConn, err := net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy, - fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) - if err != nil { - ctl.Warn("start new work connection error: %v", err) - return + var ( + workConn net.Conn + err error + ) + if config.ClientCommonCfg.TcpMux { + stream, err := ctl.session.OpenStream() + if err != nil { + ctl.Warn("start new work connection error: %v", err) + return + } + workConn = net.WrapConn(stream) + + } else { + workConn, err = net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy, + fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) + if err != nil { + ctl.Warn("start new work connection error: %v", err) + return + } } m := &msg.NewWorkConn{ @@ -166,6 +184,10 @@ func (ctl *Control) login() (err error) { if ctl.conn != nil { ctl.conn.Close() } + if ctl.session != nil { + ctl.session.Close() + } + conn, err := net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy, fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) if err != nil { @@ -178,6 +200,20 @@ func (ctl *Control) login() (err error) { } }() + if config.ClientCommonCfg.TcpMux { + session, errRet := smux.Client(conn, nil) + if errRet != nil { + return errRet + } + stream, errRet := session.OpenStream() + if errRet != nil { + session.Close() + return errRet + } + conn = net.WrapConn(stream) + ctl.session = session + } + now := time.Now().Unix() ctl.loginMsg.PrivilegeKey = util.GetAuthKey(config.ClientCommonCfg.PrivilegeToken, now) ctl.loginMsg.Timestamp = now diff --git a/conf/frpc.ini b/conf/frpc.ini index 16bb0ad..6c620ec 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -22,6 +22,9 @@ privilege_token = 12345678 # connections will be established in advance, default value is zero pool_count = 5 +# if tcp stream multiplexing is used, default is true, it must be same with frps +tcp_mux = true + # your proxy name will be changed to {user}.{proxy} user = your_name @@ -48,12 +51,16 @@ remote_port = 6001 type = udp local_ip = 114.114.114.114 local_port = 53 +remote_port = 6002 +use_encryption = false +use_compression = false # Resolve your domain names to [server_addr] so you can use http://web01.yourdomain.com to browse web01 and http://web02.yourdomain.com to browse web02 [web01] type = http local_ip = 127.0.0.1 local_port = 80 +use_encryption = false use_compression = true # http username and password are safety certification for http protocol # if not set, you can access this custom_domains without certification @@ -61,14 +68,16 @@ http_user = admin http_pwd = admin # if domain for frps is frps.com, then you can access [web01] proxy by URL http://test.frps.com subdomain = web01 - -[web02] -type = http -local_ip = 127.0.0.1 -local_port = 8000 -use_encryption = false -use_compression = false custom_domains = web02.yourdomain.com # locations is only useful for http type locations = /,/pic host_header_rewrite = example.com + +[web02] +type = https +local_ip = 127.0.0.1 +local_port = 8000 +use_encryption = false +use_compression = false +subdomain = web01 +custom_domains = web02.yourdomain.com diff --git a/conf/frps.ini b/conf/frps.ini index 6b50116..d28f8f3 100644 --- a/conf/frps.ini +++ b/conf/frps.ini @@ -37,7 +37,7 @@ privilege_token = 12345678 privilege_allow_ports = 2000-3000,3001,3003,4000-50000 # pool_count in each proxy will change to max_pool_count if they exceed the maximum value -max_pool_count = 10 +max_pool_count = 5 # authentication_timeout means the timeout interval (seconds) when the frpc connects frps # if authentication_timeout is zero, the time is not verified, default is 900s @@ -46,3 +46,6 @@ authentication_timeout = 900 # if subdomain_host is not empty, you can set subdomain when type is http or https in frpc's configure file # when subdomain is test, the host used by routing is test.frps.com subdomain_host = frps.com + +# if tcp stream multiplexing is used, default is true +tcp_mux = true diff --git a/models/config/client_common.go b/models/config/client_common.go index 6bd8f35..3384a37 100644 --- a/models/config/client_common.go +++ b/models/config/client_common.go @@ -36,6 +36,7 @@ type ClientCommonConf struct { LogMaxDays int64 PrivilegeToken string PoolCount int + TcpMux bool User string HeartBeatInterval int64 HeartBeatTimeout int64 @@ -53,6 +54,7 @@ func GetDeaultClientCommonConf() *ClientCommonConf { LogMaxDays: 3, PrivilegeToken: "", PoolCount: 1, + TcpMux: true, User: "", HeartBeatInterval: 30, HeartBeatTimeout: 90, @@ -120,6 +122,13 @@ func LoadClientCommonConf(conf ini.File) (cfg *ClientCommonConf, err error) { } } + tmpStr, ok = conf.Get("common", "tcp_mux") + if ok && tmpStr == "false" { + cfg.TcpMux = false + } else { + cfg.TcpMux = true + } + tmpStr, ok = conf.Get("common", "user") if ok { cfg.User = tmpStr diff --git a/models/config/server_common.go b/models/config/server_common.go index eff4d6d..070de8a 100644 --- a/models/config/server_common.go +++ b/models/config/server_common.go @@ -49,6 +49,7 @@ type ServerCommonConf struct { PrivilegeToken string AuthTimeout int64 SubDomainHost string + TcpMux bool // if PrivilegeAllowPorts is not nil, tcp proxies which remote port exist in this map can be connected PrivilegeAllowPorts map[int64]struct{} @@ -76,7 +77,8 @@ func GetDefaultServerCommonConf() *ServerCommonConf { PrivilegeToken: "", AuthTimeout: 900, SubDomainHost: "", - MaxPoolCount: 10, + TcpMux: true, + MaxPoolCount: 5, HeartBeatTimeout: 90, UserConnTimeout: 10, } @@ -265,6 +267,13 @@ func LoadServerCommonConf(conf ini.File) (cfg *ServerCommonConf, err error) { cfg.SubDomainHost = strings.ToLower(strings.TrimSpace(tmpStr)) } + tmpStr, ok = conf.Get("common", "tcp_mux") + if ok && tmpStr == "false" { + cfg.TcpMux = false + } else { + cfg.TcpMux = true + } + tmpStr, ok = conf.Get("common", "heartbeat_timeout") if ok { v, errRet := strconv.ParseInt(tmpStr, 10, 64) diff --git a/server/service.go b/server/service.go index 39adf20..bfc6569 100644 --- a/server/service.go +++ b/server/service.go @@ -22,10 +22,12 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/utils/log" - "github.com/fatedier/frp/utils/net" + frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/version" "github.com/fatedier/frp/utils/vhost" + + "github.com/xtaci/smux" ) const ( @@ -37,7 +39,7 @@ var ServerService *Service // Server service. type Service struct { // Accept connections from client. - listener net.Listener + listener frpNet.Listener // For http proxies, route requests to different clients by hostname and other infomation. VhostHttpMuxer *vhost.HttpMuxer @@ -66,7 +68,7 @@ func NewService() (svr *Service, err error) { } // Listen for accepting connections from client. - svr.listener, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort) + svr.listener, err = frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort) if err != nil { err = fmt.Errorf("Create server listener error, %v", err) return @@ -74,8 +76,8 @@ func NewService() (svr *Service, err error) { // Create http vhost muxer. if config.ServerCommonCfg.VhostHttpPort != 0 { - var l net.Listener - l, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpPort) + var l frpNet.Listener + l, err = frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpPort) if err != nil { err = fmt.Errorf("Create vhost http listener error, %v", err) return @@ -89,8 +91,8 @@ func NewService() (svr *Service, err error) { // Create https vhost muxer. if config.ServerCommonCfg.VhostHttpsPort != 0 { - var l net.Listener - l, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpsPort) + var l frpNet.Listener + l, err = frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpsPort) if err != nil { err = fmt.Errorf("Create vhost https listener error, %v", err) return @@ -123,40 +125,63 @@ func (svr *Service) Run() { } // Start a new goroutine for dealing connections. - go func(frpConn net.Conn) { - var rawMsg msg.Message - frpConn.SetReadDeadline(time.Now().Add(connReadTimeout)) - if rawMsg, err = msg.ReadMsg(frpConn); err != nil { - log.Warn("Failed to read message: %v", err) - frpConn.Close() - return - } - frpConn.SetReadDeadline(time.Time{}) - - switch m := rawMsg.(type) { - case *msg.Login: - err = svr.RegisterControl(frpConn, m) - // If login failed, send error message there. - // Otherwise send success message in control's work goroutine. - if err != nil { - frpConn.Warn("%v", err) - msg.WriteMsg(frpConn, &msg.LoginResp{ - Version: version.Full(), - Error: err.Error(), - }) - frpConn.Close() + go func(frpConn frpNet.Conn) { + dealFn := func(conn frpNet.Conn) { + var rawMsg msg.Message + conn.SetReadDeadline(time.Now().Add(connReadTimeout)) + if rawMsg, err = msg.ReadMsg(conn); err != nil { + log.Warn("Failed to read message: %v", err) + conn.Close() + return } - case *msg.NewWorkConn: - svr.RegisterWorkConn(frpConn, m) - default: - log.Warn("Error message type for the new connection [%s]", frpConn.RemoteAddr().String()) - frpConn.Close() + conn.SetReadDeadline(time.Time{}) + + switch m := rawMsg.(type) { + case *msg.Login: + err = svr.RegisterControl(conn, m) + // If login failed, send error message there. + // Otherwise send success message in control's work goroutine. + if err != nil { + conn.Warn("%v", err) + msg.WriteMsg(conn, &msg.LoginResp{ + Version: version.Full(), + Error: err.Error(), + }) + conn.Close() + } + case *msg.NewWorkConn: + svr.RegisterWorkConn(conn, m) + default: + log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String()) + conn.Close() + } + } + + if config.ServerCommonCfg.TcpMux { + session, err := smux.Server(frpConn, nil) + if err != nil { + log.Warn("Failed to create mux connection: %v", err) + frpConn.Close() + return + } + + for { + stream, err := session.AcceptStream() + if err != nil { + log.Warn("Accept new mux stream error: %v", err) + return + } + wrapConn := frpNet.WrapConn(stream) + go dealFn(wrapConn) + } + } else { + dealFn(frpConn) } }(c) } } -func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err error) { +func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (err error) { ctlConn.Info("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]", ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch) @@ -201,7 +226,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err } // RegisterWorkConn register a new work connection to control and proxies need it. -func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) { +func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) { ctl, exist := svr.ctlManager.GetById(newMsg.RunId) if !exist { workConn.Warn("No client control found for run id [%s]", newMsg.RunId) diff --git a/utils/net/conn.go b/utils/net/conn.go index 0189fdf..ed995c8 100644 --- a/utils/net/conn.go +++ b/utils/net/conn.go @@ -26,6 +26,18 @@ type Conn interface { log.Logger } +type WrapLogConn struct { + net.Conn + log.Logger +} + +func WrapConn(c net.Conn) Conn { + return WrapLogConn{ + Conn: c, + Logger: log.NewPrefixLogger(""), + } +} + type Listener interface { Accept() (Conn, error) Close() error