support tcp stream multiplexing by smux

This commit is contained in:
fatedier 2017-05-17 17:47:20 +08:00
parent a5f06489cb
commit b600a07ec0
7 changed files with 153 additions and 50 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/version"
"github.com/xtaci/smux"
)
const (
@ -50,6 +51,9 @@ type Control struct {
// control connection
conn net.Conn
// tcp stream multiplexing, if enabled
session *smux.Session
// put a message in this channel to send it over control connection to server
sendCh chan (msg.Message)
@ -122,12 +126,26 @@ func (ctl *Control) Run() error {
}
func (ctl *Control) NewWorkConn() {
workConn, err := net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy,
var (
workConn net.Conn
err error
)
if config.ClientCommonCfg.TcpMux {
stream, err := ctl.session.OpenStream()
if err != nil {
ctl.Warn("start new work connection error: %v", err)
return
}
workConn = net.WrapConn(stream)
} else {
workConn, err = net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy,
fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort))
if err != nil {
ctl.Warn("start new work connection error: %v", err)
return
}
}
m := &msg.NewWorkConn{
RunId: ctl.runId,
@ -166,6 +184,10 @@ func (ctl *Control) login() (err error) {
if ctl.conn != nil {
ctl.conn.Close()
}
if ctl.session != nil {
ctl.session.Close()
}
conn, err := net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy,
fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort))
if err != nil {
@ -178,6 +200,20 @@ func (ctl *Control) login() (err error) {
}
}()
if config.ClientCommonCfg.TcpMux {
session, errRet := smux.Client(conn, nil)
if errRet != nil {
return errRet
}
stream, errRet := session.OpenStream()
if errRet != nil {
session.Close()
return errRet
}
conn = net.WrapConn(stream)
ctl.session = session
}
now := time.Now().Unix()
ctl.loginMsg.PrivilegeKey = util.GetAuthKey(config.ClientCommonCfg.PrivilegeToken, now)
ctl.loginMsg.Timestamp = now

View File

@ -22,6 +22,9 @@ privilege_token = 12345678
# connections will be established in advance, default value is zero
pool_count = 5
# if tcp stream multiplexing is used, default is true, it must be same with frps
tcp_mux = true
# your proxy name will be changed to {user}.{proxy}
user = your_name
@ -48,12 +51,16 @@ remote_port = 6001
type = udp
local_ip = 114.114.114.114
local_port = 53
remote_port = 6002
use_encryption = false
use_compression = false
# Resolve your domain names to [server_addr] so you can use http://web01.yourdomain.com to browse web01 and http://web02.yourdomain.com to browse web02
[web01]
type = http
local_ip = 127.0.0.1
local_port = 80
use_encryption = false
use_compression = true
# http username and password are safety certification for http protocol
# if not set, you can access this custom_domains without certification
@ -61,14 +68,16 @@ http_user = admin
http_pwd = admin
# if domain for frps is frps.com, then you can access [web01] proxy by URL http://test.frps.com
subdomain = web01
[web02]
type = http
local_ip = 127.0.0.1
local_port = 8000
use_encryption = false
use_compression = false
custom_domains = web02.yourdomain.com
# locations is only useful for http type
locations = /,/pic
host_header_rewrite = example.com
[web02]
type = https
local_ip = 127.0.0.1
local_port = 8000
use_encryption = false
use_compression = false
subdomain = web01
custom_domains = web02.yourdomain.com

View File

@ -37,7 +37,7 @@ privilege_token = 12345678
privilege_allow_ports = 2000-3000,3001,3003,4000-50000
# pool_count in each proxy will change to max_pool_count if they exceed the maximum value
max_pool_count = 10
max_pool_count = 5
# authentication_timeout means the timeout interval (seconds) when the frpc connects frps
# if authentication_timeout is zero, the time is not verified, default is 900s
@ -46,3 +46,6 @@ authentication_timeout = 900
# if subdomain_host is not empty, you can set subdomain when type is http or https in frpc's configure file
# when subdomain is test, the host used by routing is test.frps.com
subdomain_host = frps.com
# if tcp stream multiplexing is used, default is true
tcp_mux = true

View File

@ -36,6 +36,7 @@ type ClientCommonConf struct {
LogMaxDays int64
PrivilegeToken string
PoolCount int
TcpMux bool
User string
HeartBeatInterval int64
HeartBeatTimeout int64
@ -53,6 +54,7 @@ func GetDeaultClientCommonConf() *ClientCommonConf {
LogMaxDays: 3,
PrivilegeToken: "",
PoolCount: 1,
TcpMux: true,
User: "",
HeartBeatInterval: 30,
HeartBeatTimeout: 90,
@ -120,6 +122,13 @@ func LoadClientCommonConf(conf ini.File) (cfg *ClientCommonConf, err error) {
}
}
tmpStr, ok = conf.Get("common", "tcp_mux")
if ok && tmpStr == "false" {
cfg.TcpMux = false
} else {
cfg.TcpMux = true
}
tmpStr, ok = conf.Get("common", "user")
if ok {
cfg.User = tmpStr

View File

@ -49,6 +49,7 @@ type ServerCommonConf struct {
PrivilegeToken string
AuthTimeout int64
SubDomainHost string
TcpMux bool
// if PrivilegeAllowPorts is not nil, tcp proxies which remote port exist in this map can be connected
PrivilegeAllowPorts map[int64]struct{}
@ -76,7 +77,8 @@ func GetDefaultServerCommonConf() *ServerCommonConf {
PrivilegeToken: "",
AuthTimeout: 900,
SubDomainHost: "",
MaxPoolCount: 10,
TcpMux: true,
MaxPoolCount: 5,
HeartBeatTimeout: 90,
UserConnTimeout: 10,
}
@ -265,6 +267,13 @@ func LoadServerCommonConf(conf ini.File) (cfg *ServerCommonConf, err error) {
cfg.SubDomainHost = strings.ToLower(strings.TrimSpace(tmpStr))
}
tmpStr, ok = conf.Get("common", "tcp_mux")
if ok && tmpStr == "false" {
cfg.TcpMux = false
} else {
cfg.TcpMux = true
}
tmpStr, ok = conf.Get("common", "heartbeat_timeout")
if ok {
v, errRet := strconv.ParseInt(tmpStr, 10, 64)

View File

@ -22,10 +22,12 @@ import (
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/net"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/version"
"github.com/fatedier/frp/utils/vhost"
"github.com/xtaci/smux"
)
const (
@ -37,7 +39,7 @@ var ServerService *Service
// Server service.
type Service struct {
// Accept connections from client.
listener net.Listener
listener frpNet.Listener
// For http proxies, route requests to different clients by hostname and other infomation.
VhostHttpMuxer *vhost.HttpMuxer
@ -66,7 +68,7 @@ func NewService() (svr *Service, err error) {
}
// Listen for accepting connections from client.
svr.listener, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
svr.listener, err = frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
if err != nil {
err = fmt.Errorf("Create server listener error, %v", err)
return
@ -74,8 +76,8 @@ func NewService() (svr *Service, err error) {
// Create http vhost muxer.
if config.ServerCommonCfg.VhostHttpPort != 0 {
var l net.Listener
l, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpPort)
var l frpNet.Listener
l, err = frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpPort)
if err != nil {
err = fmt.Errorf("Create vhost http listener error, %v", err)
return
@ -89,8 +91,8 @@ func NewService() (svr *Service, err error) {
// Create https vhost muxer.
if config.ServerCommonCfg.VhostHttpsPort != 0 {
var l net.Listener
l, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpsPort)
var l frpNet.Listener
l, err = frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpsPort)
if err != nil {
err = fmt.Errorf("Create vhost https listener error, %v", err)
return
@ -123,40 +125,63 @@ func (svr *Service) Run() {
}
// Start a new goroutine for dealing connections.
go func(frpConn net.Conn) {
go func(frpConn frpNet.Conn) {
dealFn := func(conn frpNet.Conn) {
var rawMsg msg.Message
frpConn.SetReadDeadline(time.Now().Add(connReadTimeout))
if rawMsg, err = msg.ReadMsg(frpConn); err != nil {
conn.SetReadDeadline(time.Now().Add(connReadTimeout))
if rawMsg, err = msg.ReadMsg(conn); err != nil {
log.Warn("Failed to read message: %v", err)
frpConn.Close()
conn.Close()
return
}
frpConn.SetReadDeadline(time.Time{})
conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Login:
err = svr.RegisterControl(frpConn, m)
err = svr.RegisterControl(conn, m)
// If login failed, send error message there.
// Otherwise send success message in control's work goroutine.
if err != nil {
frpConn.Warn("%v", err)
msg.WriteMsg(frpConn, &msg.LoginResp{
conn.Warn("%v", err)
msg.WriteMsg(conn, &msg.LoginResp{
Version: version.Full(),
Error: err.Error(),
})
frpConn.Close()
conn.Close()
}
case *msg.NewWorkConn:
svr.RegisterWorkConn(frpConn, m)
svr.RegisterWorkConn(conn, m)
default:
log.Warn("Error message type for the new connection [%s]", frpConn.RemoteAddr().String())
log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String())
conn.Close()
}
}
if config.ServerCommonCfg.TcpMux {
session, err := smux.Server(frpConn, nil)
if err != nil {
log.Warn("Failed to create mux connection: %v", err)
frpConn.Close()
return
}
for {
stream, err := session.AcceptStream()
if err != nil {
log.Warn("Accept new mux stream error: %v", err)
return
}
wrapConn := frpNet.WrapConn(stream)
go dealFn(wrapConn)
}
} else {
dealFn(frpConn)
}
}(c)
}
}
func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err error) {
func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (err error) {
ctlConn.Info("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]",
ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch)
@ -201,7 +226,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
}
// RegisterWorkConn register a new work connection to control and proxies need it.
func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) {
func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) {
ctl, exist := svr.ctlManager.GetById(newMsg.RunId)
if !exist {
workConn.Warn("No client control found for run id [%s]", newMsg.RunId)

View File

@ -26,6 +26,18 @@ type Conn interface {
log.Logger
}
type WrapLogConn struct {
net.Conn
log.Logger
}
func WrapConn(c net.Conn) Conn {
return WrapLogConn{
Conn: c,
Logger: log.NewPrefixLogger(""),
}
}
type Listener interface {
Accept() (Conn, error)
Close() error