diff --git a/server/group/group.go b/server/group/group.go new file mode 100644 index 0000000..859239e --- /dev/null +++ b/server/group/group.go @@ -0,0 +1,25 @@ +// Copyright 2018 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package group + +import ( + "errors" +) + +var ( + ErrGroupAuthFailed = errors.New("group auth failed") + ErrGroupParamsInvalid = errors.New("group params invalid") + ErrListenerClosed = errors.New("group listener closed") +) diff --git a/server/group.go b/server/group/tcp.go similarity index 93% rename from server/group.go rename to server/group/tcp.go index 24b292c..2de05b4 100644 --- a/server/group.go +++ b/server/group/tcp.go @@ -12,21 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package server +package group import ( - "errors" "fmt" "net" "sync" - gerr "github.com/fatedier/golib/errors" -) + "github.com/fatedier/frp/server/ports" -var ( - ErrGroupAuthFailed = errors.New("group auth failed") - ErrGroupParamsInvalid = errors.New("group params invalid") - ErrListenerClosed = errors.New("group listener closed") + gerr "github.com/fatedier/golib/errors" ) type TcpGroupListener struct { @@ -173,11 +168,11 @@ func (tg *TcpGroup) CloseListener(ln *TcpGroupListener) { type TcpGroupCtl struct { groups map[string]*TcpGroup - portManager *PortManager + portManager *ports.PortManager mu sync.Mutex } -func NewTcpGroupCtl(portManager *PortManager) *TcpGroupCtl { +func NewTcpGroupCtl(portManager *ports.PortManager) *TcpGroupCtl { return &TcpGroupCtl{ groups: make(map[string]*TcpGroup), portManager: portManager, diff --git a/server/ports.go b/server/ports/ports.go similarity index 99% rename from server/ports.go rename to server/ports/ports.go index 1d084d4..a42fd91 100644 --- a/server/ports.go +++ b/server/ports/ports.go @@ -1,4 +1,4 @@ -package server +package ports import ( "errors" diff --git a/server/service.go b/server/service.go index 8038fdd..a9b14a6 100644 --- a/server/service.go +++ b/server/service.go @@ -24,6 +24,8 @@ import ( "github.com/fatedier/frp/assets" "github.com/fatedier/frp/g" "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/server/group" + "github.com/fatedier/frp/server/ports" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" @@ -66,13 +68,13 @@ type Service struct { visitorManager *VisitorManager // Manage all tcp ports - tcpPortManager *PortManager + tcpPortManager *ports.PortManager // Manage all udp ports - udpPortManager *PortManager + udpPortManager *ports.PortManager // Tcp Group Controller - tcpGroupCtl *TcpGroupCtl + tcpGroupCtl *group.TcpGroupCtl // Controller for nat hole connections natHoleController *NatHoleController @@ -84,10 +86,10 @@ func NewService() (svr *Service, err error) { ctlManager: NewControlManager(), pxyManager: NewProxyManager(), visitorManager: NewVisitorManager(), - tcpPortManager: NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts), - udpPortManager: NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts), + tcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts), + udpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts), } - svr.tcpGroupCtl = NewTcpGroupCtl(svr.tcpPortManager) + svr.tcpGroupCtl = group.NewTcpGroupCtl(svr.tcpPortManager) // Init assets. err = assets.Load(cfg.AssetsDir) diff --git a/tests/conf/auto_test_frpc.ini b/tests/conf/auto_test_frpc.ini index 54d37f6..14f2e85 100644 --- a/tests/conf/auto_test_frpc.ini +++ b/tests/conf/auto_test_frpc.ini @@ -23,6 +23,22 @@ remote_port = 10901 use_encryption = true use_compression = true +[tcp_group1] +type = tcp +local_ip = 127.0.0.1 +local_port = 10701 +remote_port = 10802 +group = test1 +group_key = 123 + +[tcp_group2] +type = tcp +local_ip = 127.0.0.1 +local_port = 10702 +remote_port = 10802 +group = test1 +group_key = 123 + [udp_normal] type = udp local_ip = 127.0.0.1 diff --git a/tests/echo_server.go b/tests/echo_server.go index 5c73fef..380c036 100644 --- a/tests/echo_server.go +++ b/tests/echo_server.go @@ -28,6 +28,24 @@ func StartTcpEchoServer() { } } +func StartTcpEchoServer2() { + l, err := frpNet.ListenTcp("127.0.0.1", TEST_TCP2_PORT) + if err != nil { + fmt.Printf("echo server2 listen error: %v\n", err) + return + } + + for { + c, err := l.Accept() + if err != nil { + fmt.Printf("echo server2 accept error: %v\n", err) + return + } + + go echoWorker2(c) + } +} + func StartUdpEchoServer() { l, err := frpNet.ListenUDP("127.0.0.1", TEST_UDP_PORT) if err != nil { @@ -85,3 +103,25 @@ func echoWorker(c net.Conn) { c.Write(buf[:n]) } } + +func echoWorker2(c net.Conn) { + buf := make([]byte, 2048) + + for { + n, err := c.Read(buf) + if err != nil { + if err == io.EOF { + c.Close() + break + } else { + fmt.Printf("echo server read error: %v\n", err) + return + } + } + + var w []byte + w = append(w, buf[:n]...) + w = append(w, buf[:n]...) + c.Write(w) + } +} diff --git a/tests/func_test.go b/tests/func_test.go index b71bed0..1d0cd37 100644 --- a/tests/func_test.go +++ b/tests/func_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/fatedier/frp/client" - "github.com/fatedier/frp/server" + "github.com/fatedier/frp/server/ports" gnet "github.com/fatedier/golib/net" ) @@ -25,7 +25,9 @@ var ( TEST_STR = "frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet." TEST_TCP_PORT int = 10701 + TEST_TCP2_PORT int = 10702 TEST_TCP_FRP_PORT int = 10801 + TEST_TCP2_FRP_PORT int = 10802 TEST_TCP_EC_FRP_PORT int = 10901 TEST_TCP_ECHO_STR string = "tcp type:" + TEST_STR @@ -62,6 +64,7 @@ var ( func init() { go StartTcpEchoServer() + go StartTcpEchoServer2() go StartUdpEchoServer() go StartUnixDomainServer() go StartHttpServer() @@ -226,19 +229,19 @@ func TestAllowPorts(t *testing.T) { status, err := getProxyStatus(ProxyTcpPortNotAllowed) if assert.NoError(err) { assert.Equal(client.ProxyStatusStartErr, status.Status) - assert.True(strings.Contains(status.Err, server.ErrPortNotAllowed.Error())) + assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error())) } status, err = getProxyStatus(ProxyUdpPortNotAllowed) if assert.NoError(err) { assert.Equal(client.ProxyStatusStartErr, status.Status) - assert.True(strings.Contains(status.Err, server.ErrPortNotAllowed.Error())) + assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error())) } status, err = getProxyStatus(ProxyTcpPortUnavailable) if assert.NoError(err) { assert.Equal(client.ProxyStatusStartErr, status.Status) - assert.True(strings.Contains(status.Err, server.ErrPortUnAvailable.Error())) + assert.True(strings.Contains(status.Err, ports.ErrPortUnAvailable.Error())) } // Port normal @@ -310,3 +313,25 @@ func TestRangePortsMapping(t *testing.T) { } } } + +func TestGroup(t *testing.T) { + assert := assert.New(t) + + var ( + p1 int + p2 int + ) + addr := fmt.Sprintf("127.0.0.1:%d", TEST_TCP2_FRP_PORT) + + for i := 0; i < 6; i++ { + res, err := sendTcpMsg(addr, TEST_TCP_ECHO_STR) + assert.NoError(err) + switch res { + case TEST_TCP_ECHO_STR: + p1++ + case TEST_TCP_ECHO_STR + TEST_TCP_ECHO_STR: + p2++ + } + } + assert.True(p1 > 0 && p2 > 0, "group proxies load balancing") +}