support protocol kcp

This commit is contained in:
fatedier 2017-06-04 19:56:21 +08:00
parent 74cf57feb3
commit 80ba931326
9 changed files with 202 additions and 5 deletions

View File

@ -150,7 +150,7 @@ func (ctl *Control) NewWorkConn() {
workConn = net.WrapConn(stream) workConn = net.WrapConn(stream)
} else { } else {
workConn, err = net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy, workConn, err = net.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol,
fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort))
if err != nil { if err != nil {
ctl.Warn("start new work connection error: %v", err) ctl.Warn("start new work connection error: %v", err)
@ -199,7 +199,7 @@ func (ctl *Control) login() (err error) {
ctl.session.Close() ctl.session.Close()
} }
conn, err := net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy, conn, err := net.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol,
fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort))
if err != nil { if err != nil {
return err return err

View File

@ -294,7 +294,7 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.
workConn.Debug("handle by plugin finished") workConn.Debug("handle by plugin finished")
return return
} else { } else {
localConn, err := frpNet.ConnectTcpServer(fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort)) localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
if err != nil { if err != nil {
workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err) workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
return return

View File

@ -32,6 +32,10 @@ user = your_name
# default is true # default is true
login_fail_exit = true login_fail_exit = true
# communication protocol used to connect to server
# now it supports tcp and kcp, default is tcp
protocol = tcp
# proxy names you want to start divided by ',' # proxy names you want to start divided by ','
# default is empty, means all proxies # default is empty, means all proxies
# start = ssh,dns # start = ssh,dns

View File

@ -49,3 +49,6 @@ subdomain_host = frps.com
# if tcp stream multiplexing is used, default is true # if tcp stream multiplexing is used, default is true
tcp_mux = true tcp_mux = true
# if server support client using kcp protocol, default is true
support_kcp = true

View File

@ -41,6 +41,7 @@ type ClientCommonConf struct {
User string User string
LoginFailExit bool LoginFailExit bool
Start map[string]struct{} Start map[string]struct{}
Protocol string
HeartBeatInterval int64 HeartBeatInterval int64
HeartBeatTimeout int64 HeartBeatTimeout int64
} }
@ -61,6 +62,7 @@ func GetDeaultClientCommonConf() *ClientCommonConf {
User: "", User: "",
LoginFailExit: true, LoginFailExit: true,
Start: make(map[string]struct{}), Start: make(map[string]struct{}),
Protocol: "tcp",
HeartBeatInterval: 30, HeartBeatInterval: 30,
HeartBeatTimeout: 90, HeartBeatTimeout: 90,
} }
@ -154,6 +156,15 @@ func LoadClientCommonConf(conf ini.File) (cfg *ClientCommonConf, err error) {
cfg.LoginFailExit = true cfg.LoginFailExit = true
} }
tmpStr, ok = conf.Get("common", "protocol")
if ok {
// Now it only support tcp and kcp.
if tmpStr != "kcp" {
tmpStr = "tcp"
}
cfg.Protocol = tmpStr
}
tmpStr, ok = conf.Get("common", "heartbeat_timeout") tmpStr, ok = conf.Get("common", "heartbeat_timeout")
if ok { if ok {
v, err = strconv.ParseInt(tmpStr, 10, 64) v, err = strconv.ParseInt(tmpStr, 10, 64)

View File

@ -51,6 +51,7 @@ type ServerCommonConf struct {
AuthTimeout int64 AuthTimeout int64
SubDomainHost string SubDomainHost string
TcpMux bool TcpMux bool
SupportKcp bool
// if PrivilegeAllowPorts is not nil, tcp proxies which remote port exist in this map can be connected // if PrivilegeAllowPorts is not nil, tcp proxies which remote port exist in this map can be connected
PrivilegeAllowPorts [][2]int64 PrivilegeAllowPorts [][2]int64
@ -79,6 +80,7 @@ func GetDefaultServerCommonConf() *ServerCommonConf {
AuthTimeout: 900, AuthTimeout: 900,
SubDomainHost: "", SubDomainHost: "",
TcpMux: true, TcpMux: true,
SupportKcp: true,
MaxPoolCount: 5, MaxPoolCount: 5,
HeartBeatTimeout: 90, HeartBeatTimeout: 90,
UserConnTimeout: 10, UserConnTimeout: 10,
@ -231,6 +233,13 @@ func LoadServerCommonConf(conf ini.File) (cfg *ServerCommonConf, err error) {
cfg.TcpMux = true cfg.TcpMux = true
} }
tmpStr, ok = conf.Get("common", "support_kcp")
if ok && tmpStr == "false" {
cfg.SupportKcp = false
} else {
cfg.SupportKcp = true
}
tmpStr, ok = conf.Get("common", "heartbeat_timeout") tmpStr, ok = conf.Get("common", "heartbeat_timeout")
if ok { if ok {
v, errRet := strconv.ParseInt(tmpStr, 10, 64) v, errRet := strconv.ParseInt(tmpStr, 10, 64)

View File

@ -41,6 +41,9 @@ type Service struct {
// Accept connections from client. // Accept connections from client.
listener frpNet.Listener listener frpNet.Listener
// Accept connections using kcp.
kcpListener frpNet.Listener
// For http proxies, route requests to different clients by hostname and other infomation. // For http proxies, route requests to different clients by hostname and other infomation.
VhostHttpMuxer *vhost.HttpMuxer VhostHttpMuxer *vhost.HttpMuxer
@ -73,6 +76,17 @@ func NewService() (svr *Service, err error) {
err = fmt.Errorf("Create server listener error, %v", err) err = fmt.Errorf("Create server listener error, %v", err)
return return
} }
log.Info("frps tcp listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
// Listen for accepting connections from client using kcp protocol.
if config.ServerCommonCfg.SupportKcp {
svr.kcpListener, err = frpNet.ListenKcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
if err != nil {
err = fmt.Errorf("Listen on kcp address [%s:%d] error: %v", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort, err)
return
}
log.Info("frps kcp listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
}
// Create http vhost muxer. // Create http vhost muxer.
if config.ServerCommonCfg.VhostHttpPort != 0 { if config.ServerCommonCfg.VhostHttpPort != 0 {
@ -87,6 +101,7 @@ func NewService() (svr *Service, err error) {
err = fmt.Errorf("Create vhost httpMuxer error, %v", err) err = fmt.Errorf("Create vhost httpMuxer error, %v", err)
return return
} }
log.Info("http service listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpPort)
} }
// Create https vhost muxer. // Create https vhost muxer.
@ -102,6 +117,7 @@ func NewService() (svr *Service, err error) {
err = fmt.Errorf("Create vhost httpsMuxer error, %v", err) err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
return return
} }
log.Info("https service listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpsPort)
} }
// Create dashboard web server. // Create dashboard web server.
@ -117,9 +133,17 @@ func NewService() (svr *Service, err error) {
} }
func (svr *Service) Run() { func (svr *Service) Run() {
if config.ServerCommonCfg.SupportKcp {
go svr.HandleListener(svr.kcpListener)
}
svr.HandleListener(svr.listener)
}
func (svr *Service) HandleListener(l frpNet.Listener) {
// Listen for incoming connections from client. // Listen for incoming connections from client.
for { for {
c, err := svr.listener.Accept() c, err := l.Accept()
if err != nil { if err != nil {
log.Warn("Listener for incoming connections from client closed") log.Warn("Listener for incoming connections from client closed")
return return
@ -131,7 +155,7 @@ func (svr *Service) Run() {
var rawMsg msg.Message var rawMsg msg.Message
conn.SetReadDeadline(time.Now().Add(connReadTimeout)) conn.SetReadDeadline(time.Now().Add(connReadTimeout))
if rawMsg, err = msg.ReadMsg(conn); err != nil { if rawMsg, err = msg.ReadMsg(conn); err != nil {
log.Warn("Failed to read message: %v", err) log.Trace("Failed to read message: %v", err)
conn.Close() conn.Close()
return return
} }

View File

@ -15,11 +15,14 @@
package net package net
import ( import (
"fmt"
"io" "io"
"net" "net"
"time" "time"
"github.com/fatedier/frp/utils/log" "github.com/fatedier/frp/utils/log"
kcp "github.com/xtaci/kcp-go"
) )
// Conn is the interface of connections used in frp. // Conn is the interface of connections used in frp.
@ -77,3 +80,59 @@ type Listener interface {
Close() error Close() error
log.Logger log.Logger
} }
type LogListener struct {
l net.Listener
net.Listener
log.Logger
}
func WrapLogListener(l net.Listener) Listener {
return &LogListener{
l: l,
Listener: l,
Logger: log.NewPrefixLogger(""),
}
}
func (logL *LogListener) Accept() (Conn, error) {
c, err := logL.l.Accept()
return WrapConn(c), err
}
func ConnectServer(protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
return ConnectTcpServer(addr)
case "kcp":
kcpConn, errRet := kcp.DialWithOptions(addr, nil, 10, 3)
if errRet != nil {
err = errRet
return
}
kcpConn.SetStreamMode(true)
kcpConn.SetWriteDelay(true)
kcpConn.SetNoDelay(1, 20, 2, 1)
kcpConn.SetWindowSize(128, 512)
kcpConn.SetMtu(1350)
kcpConn.SetACKNoDelay(false)
kcpConn.SetReadBuffer(4194304)
kcpConn.SetWriteBuffer(4194304)
c = WrapConn(kcpConn)
return
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
}
}
func ConnectServerByHttpProxy(httpProxy string, protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
return ConnectTcpServerByHttpProxy(httpProxy, addr)
case "kcp":
// http proxy is not supported for kcp
return ConnectServer(protocol, addr)
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
}
}

87
utils/net/kcp.go Normal file
View File

@ -0,0 +1,87 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package net
import (
"fmt"
"net"
"github.com/fatedier/frp/utils/log"
kcp "github.com/xtaci/kcp-go"
)
type KcpListener struct {
net.Addr
listener net.Listener
accept chan Conn
closeFlag bool
log.Logger
}
func ListenKcp(bindAddr string, bindPort int64) (l *KcpListener, err error) {
listener, err := kcp.ListenWithOptions(fmt.Sprintf("%s:%d", bindAddr, bindPort), nil, 10, 3)
if err != nil {
return l, err
}
listener.SetReadBuffer(4194304)
listener.SetWriteBuffer(4194304)
l = &KcpListener{
Addr: listener.Addr(),
listener: listener,
accept: make(chan Conn),
closeFlag: false,
Logger: log.NewPrefixLogger(""),
}
go func() {
for {
conn, err := listener.AcceptKCP()
if err != nil {
if l.closeFlag {
close(l.accept)
return
}
continue
}
conn.SetStreamMode(true)
conn.SetWriteDelay(true)
conn.SetNoDelay(1, 20, 2, 1)
conn.SetMtu(1350)
conn.SetWindowSize(1024, 1024)
conn.SetACKNoDelay(false)
l.accept <- WrapConn(conn)
}
}()
return l, err
}
func (l *KcpListener) Accept() (Conn, error) {
conn, ok := <-l.accept
if !ok {
return conn, fmt.Errorf("channel for kcp listener closed")
}
return conn, nil
}
func (l *KcpListener) Close() error {
if !l.closeFlag {
l.closeFlag = true
l.listener.Close()
}
return nil
}