update group ci

This commit is contained in:
fatedier 2018-05-23 14:39:12 +08:00
parent f56b49ad3b
commit 495b577819
7 changed files with 124 additions and 21 deletions

25
server/group/group.go Normal file
View File

@ -0,0 +1,25 @@
// Copyright 2018 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package group
import (
"errors"
)
var (
ErrGroupAuthFailed = errors.New("group auth failed")
ErrGroupParamsInvalid = errors.New("group params invalid")
ErrListenerClosed = errors.New("group listener closed")
)

View File

@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package server package group
import ( import (
"errors"
"fmt" "fmt"
"net" "net"
"sync" "sync"
gerr "github.com/fatedier/golib/errors" "github.com/fatedier/frp/server/ports"
)
var ( gerr "github.com/fatedier/golib/errors"
ErrGroupAuthFailed = errors.New("group auth failed")
ErrGroupParamsInvalid = errors.New("group params invalid")
ErrListenerClosed = errors.New("group listener closed")
) )
type TcpGroupListener struct { type TcpGroupListener struct {
@ -173,11 +168,11 @@ func (tg *TcpGroup) CloseListener(ln *TcpGroupListener) {
type TcpGroupCtl struct { type TcpGroupCtl struct {
groups map[string]*TcpGroup groups map[string]*TcpGroup
portManager *PortManager portManager *ports.PortManager
mu sync.Mutex mu sync.Mutex
} }
func NewTcpGroupCtl(portManager *PortManager) *TcpGroupCtl { func NewTcpGroupCtl(portManager *ports.PortManager) *TcpGroupCtl {
return &TcpGroupCtl{ return &TcpGroupCtl{
groups: make(map[string]*TcpGroup), groups: make(map[string]*TcpGroup),
portManager: portManager, portManager: portManager,

View File

@ -1,4 +1,4 @@
package server package ports
import ( import (
"errors" "errors"

View File

@ -24,6 +24,8 @@ import (
"github.com/fatedier/frp/assets" "github.com/fatedier/frp/assets"
"github.com/fatedier/frp/g" "github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/server/group"
"github.com/fatedier/frp/server/ports"
"github.com/fatedier/frp/utils/log" "github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/util"
@ -66,13 +68,13 @@ type Service struct {
visitorManager *VisitorManager visitorManager *VisitorManager
// Manage all tcp ports // Manage all tcp ports
tcpPortManager *PortManager tcpPortManager *ports.PortManager
// Manage all udp ports // Manage all udp ports
udpPortManager *PortManager udpPortManager *ports.PortManager
// Tcp Group Controller // Tcp Group Controller
tcpGroupCtl *TcpGroupCtl tcpGroupCtl *group.TcpGroupCtl
// Controller for nat hole connections // Controller for nat hole connections
natHoleController *NatHoleController natHoleController *NatHoleController
@ -84,10 +86,10 @@ func NewService() (svr *Service, err error) {
ctlManager: NewControlManager(), ctlManager: NewControlManager(),
pxyManager: NewProxyManager(), pxyManager: NewProxyManager(),
visitorManager: NewVisitorManager(), visitorManager: NewVisitorManager(),
tcpPortManager: NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts), tcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
udpPortManager: NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts), udpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
} }
svr.tcpGroupCtl = NewTcpGroupCtl(svr.tcpPortManager) svr.tcpGroupCtl = group.NewTcpGroupCtl(svr.tcpPortManager)
// Init assets. // Init assets.
err = assets.Load(cfg.AssetsDir) err = assets.Load(cfg.AssetsDir)

View File

@ -23,6 +23,22 @@ remote_port = 10901
use_encryption = true use_encryption = true
use_compression = true use_compression = true
[tcp_group1]
type = tcp
local_ip = 127.0.0.1
local_port = 10701
remote_port = 10802
group = test1
group_key = 123
[tcp_group2]
type = tcp
local_ip = 127.0.0.1
local_port = 10702
remote_port = 10802
group = test1
group_key = 123
[udp_normal] [udp_normal]
type = udp type = udp
local_ip = 127.0.0.1 local_ip = 127.0.0.1

View File

@ -28,6 +28,24 @@ func StartTcpEchoServer() {
} }
} }
func StartTcpEchoServer2() {
l, err := frpNet.ListenTcp("127.0.0.1", TEST_TCP2_PORT)
if err != nil {
fmt.Printf("echo server2 listen error: %v\n", err)
return
}
for {
c, err := l.Accept()
if err != nil {
fmt.Printf("echo server2 accept error: %v\n", err)
return
}
go echoWorker2(c)
}
}
func StartUdpEchoServer() { func StartUdpEchoServer() {
l, err := frpNet.ListenUDP("127.0.0.1", TEST_UDP_PORT) l, err := frpNet.ListenUDP("127.0.0.1", TEST_UDP_PORT)
if err != nil { if err != nil {
@ -85,3 +103,25 @@ func echoWorker(c net.Conn) {
c.Write(buf[:n]) c.Write(buf[:n])
} }
} }
func echoWorker2(c net.Conn) {
buf := make([]byte, 2048)
for {
n, err := c.Read(buf)
if err != nil {
if err == io.EOF {
c.Close()
break
} else {
fmt.Printf("echo server read error: %v\n", err)
return
}
}
var w []byte
w = append(w, buf[:n]...)
w = append(w, buf[:n]...)
c.Write(w)
}
}

View File

@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/fatedier/frp/client" "github.com/fatedier/frp/client"
"github.com/fatedier/frp/server" "github.com/fatedier/frp/server/ports"
gnet "github.com/fatedier/golib/net" gnet "github.com/fatedier/golib/net"
) )
@ -25,7 +25,9 @@ var (
TEST_STR = "frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet." TEST_STR = "frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet."
TEST_TCP_PORT int = 10701 TEST_TCP_PORT int = 10701
TEST_TCP2_PORT int = 10702
TEST_TCP_FRP_PORT int = 10801 TEST_TCP_FRP_PORT int = 10801
TEST_TCP2_FRP_PORT int = 10802
TEST_TCP_EC_FRP_PORT int = 10901 TEST_TCP_EC_FRP_PORT int = 10901
TEST_TCP_ECHO_STR string = "tcp type:" + TEST_STR TEST_TCP_ECHO_STR string = "tcp type:" + TEST_STR
@ -62,6 +64,7 @@ var (
func init() { func init() {
go StartTcpEchoServer() go StartTcpEchoServer()
go StartTcpEchoServer2()
go StartUdpEchoServer() go StartUdpEchoServer()
go StartUnixDomainServer() go StartUnixDomainServer()
go StartHttpServer() go StartHttpServer()
@ -226,19 +229,19 @@ func TestAllowPorts(t *testing.T) {
status, err := getProxyStatus(ProxyTcpPortNotAllowed) status, err := getProxyStatus(ProxyTcpPortNotAllowed)
if assert.NoError(err) { if assert.NoError(err) {
assert.Equal(client.ProxyStatusStartErr, status.Status) assert.Equal(client.ProxyStatusStartErr, status.Status)
assert.True(strings.Contains(status.Err, server.ErrPortNotAllowed.Error())) assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error()))
} }
status, err = getProxyStatus(ProxyUdpPortNotAllowed) status, err = getProxyStatus(ProxyUdpPortNotAllowed)
if assert.NoError(err) { if assert.NoError(err) {
assert.Equal(client.ProxyStatusStartErr, status.Status) assert.Equal(client.ProxyStatusStartErr, status.Status)
assert.True(strings.Contains(status.Err, server.ErrPortNotAllowed.Error())) assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error()))
} }
status, err = getProxyStatus(ProxyTcpPortUnavailable) status, err = getProxyStatus(ProxyTcpPortUnavailable)
if assert.NoError(err) { if assert.NoError(err) {
assert.Equal(client.ProxyStatusStartErr, status.Status) assert.Equal(client.ProxyStatusStartErr, status.Status)
assert.True(strings.Contains(status.Err, server.ErrPortUnAvailable.Error())) assert.True(strings.Contains(status.Err, ports.ErrPortUnAvailable.Error()))
} }
// Port normal // Port normal
@ -310,3 +313,25 @@ func TestRangePortsMapping(t *testing.T) {
} }
} }
} }
func TestGroup(t *testing.T) {
assert := assert.New(t)
var (
p1 int
p2 int
)
addr := fmt.Sprintf("127.0.0.1:%d", TEST_TCP2_FRP_PORT)
for i := 0; i < 6; i++ {
res, err := sendTcpMsg(addr, TEST_TCP_ECHO_STR)
assert.NoError(err)
switch res {
case TEST_TCP_ECHO_STR:
p1++
case TEST_TCP_ECHO_STR + TEST_TCP_ECHO_STR:
p2++
}
}
assert.True(p1 > 0 && p2 > 0, "group proxies load balancing")
}