From aea9f9fbcc8735193c979c684ae360d450d40c3c Mon Sep 17 00:00:00 2001 From: fatedier Date: Sun, 9 Dec 2018 21:56:46 +0800 Subject: [PATCH] health: add more ci cases and fix bugs --- Makefile | 2 +- client/proxy_wrapper.go | 29 +++- cmd/frpc/sub/root.go | 5 + cmd/frps/root.go | 9 +- conf/frpc_full.ini | 2 + models/config/proxy.go | 7 + tests/ci/health/health_test.go | 247 +++++++++++++++++++++++++++++++++ tests/ci/normal_test.go | 14 +- tests/ci/reconnect_test.go | 2 - tests/ci/reload_test.go | 8 +- tests/consts/consts.go | 3 + tests/mock/echo_server.go | 92 ++++++------ tests/mock/http_server.go | 31 +++++ tests/util/process.go | 24 +++- 14 files changed, 409 insertions(+), 66 deletions(-) create mode 100644 tests/ci/health/health_test.go diff --git a/Makefile b/Makefile index bc450af..c217eb8 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ gotest: go test -v --cover ./utils/... ci: - go test -count=1 -v ./tests/... + go test -count=1 -p=1 -v ./tests/... alltest: gotest ci diff --git a/client/proxy_wrapper.go b/client/proxy_wrapper.go index a19d97b..059d182 100644 --- a/client/proxy_wrapper.go +++ b/client/proxy_wrapper.go @@ -10,6 +10,8 @@ import ( "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" + + "github.com/fatedier/golib/errors" ) const ( @@ -55,6 +57,7 @@ type ProxyWrapper struct { lastSendStartMsg time.Time lastStartErr time.Time closeCh chan struct{} + healthNotifyCh chan struct{} mu sync.RWMutex log.Logger @@ -69,9 +72,10 @@ func NewProxyWrapper(cfg config.ProxyConf, eventHandler EventHandler, logPrefix Status: ProxyStatusNew, Cfg: cfg, }, - closeCh: make(chan struct{}), - handler: eventHandler, - Logger: log.NewPrefixLogger(logPrefix), + closeCh: make(chan struct{}), + healthNotifyCh: make(chan struct{}), + handler: eventHandler, + Logger: log.NewPrefixLogger(logPrefix), } pw.AddLogPrefix(pw.Name) @@ -125,6 +129,8 @@ func (pw *ProxyWrapper) Start() { func (pw *ProxyWrapper) Stop() { pw.mu.Lock() defer pw.mu.Unlock() + close(pw.closeCh) + close(pw.healthNotifyCh) pw.pxy.Close() if pw.monitor != nil { pw.monitor.Stop() @@ -139,6 +145,10 @@ func (pw *ProxyWrapper) Stop() { } func (pw *ProxyWrapper) checkWorker() { + if pw.monitor != nil { + // let monitor do check request first + time.Sleep(500 * time.Millisecond) + } for { // check proxy status now := time.Now() @@ -178,17 +188,30 @@ func (pw *ProxyWrapper) checkWorker() { case <-pw.closeCh: return case <-time.After(statusCheckInterval): + case <-pw.healthNotifyCh: } } } func (pw *ProxyWrapper) statusNormalCallback() { atomic.StoreUint32(&pw.health, 0) + errors.PanicToError(func() { + select { + case pw.healthNotifyCh <- struct{}{}: + default: + } + }) pw.Info("health check success") } func (pw *ProxyWrapper) statusFailedCallback() { atomic.StoreUint32(&pw.health, 1) + errors.PanicToError(func() { + select { + case pw.healthNotifyCh <- struct{}{}: + default: + } + }) pw.Info("health check failed") } diff --git a/cmd/frpc/sub/root.go b/cmd/frpc/sub/root.go index 23b3bf9..ca3b853 100644 --- a/cmd/frpc/sub/root.go +++ b/cmd/frpc/sub/root.go @@ -166,6 +166,11 @@ func parseClientCommonCfgFromCmd() (err error) { g.GlbClientCfg.LogLevel = logLevel g.GlbClientCfg.LogFile = logFile g.GlbClientCfg.LogMaxDays = int64(logMaxDays) + if logFile == "console" { + g.GlbClientCfg.LogWay = "console" + } else { + g.GlbClientCfg.LogWay = "file" + } return nil } diff --git a/cmd/frps/root.go b/cmd/frps/root.go index 76a1acd..b4ccff4 100644 --- a/cmd/frps/root.go +++ b/cmd/frps/root.go @@ -52,7 +52,6 @@ var ( dashboardPwd string assetsDir string logFile string - logWay string logLevel string logMaxDays int64 token string @@ -81,7 +80,6 @@ func init() { rootCmd.PersistentFlags().StringVarP(&dashboardUser, "dashboard_user", "", "admin", "dashboard user") rootCmd.PersistentFlags().StringVarP(&dashboardPwd, "dashboard_pwd", "", "admin", "dashboard password") rootCmd.PersistentFlags().StringVarP(&logFile, "log_file", "", "console", "log file") - rootCmd.PersistentFlags().StringVarP(&logWay, "log_way", "", "console", "log way") rootCmd.PersistentFlags().StringVarP(&logLevel, "log_level", "", "info", "log level") rootCmd.PersistentFlags().Int64VarP(&logMaxDays, "log_max_days", "", 3, "log_max_days") rootCmd.PersistentFlags().StringVarP(&token, "token", "t", "", "auth token") @@ -175,7 +173,6 @@ func parseServerCommonCfgFromCmd() (err error) { g.GlbServerCfg.DashboardUser = dashboardUser g.GlbServerCfg.DashboardPwd = dashboardPwd g.GlbServerCfg.LogFile = logFile - g.GlbServerCfg.LogWay = logWay g.GlbServerCfg.LogLevel = logLevel g.GlbServerCfg.LogMaxDays = logMaxDays g.GlbServerCfg.Token = token @@ -194,6 +191,12 @@ func parseServerCommonCfgFromCmd() (err error) { } } g.GlbServerCfg.MaxPortsPerClient = maxPortsPerClient + + if logFile == "console" { + g.GlbClientCfg.LogWay = "console" + } else { + g.GlbClientCfg.LogWay = "file" + } return } diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 2b5d327..3f9a69d 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -77,6 +77,8 @@ group_key = 123456 # frpc will connect local service's port to detect it's healthy status health_check_type = tcp health_check_interval_s = 10 +health_check_max_failed = 1 +health_check_timeout_s = 3 [ssh_random] type = tcp diff --git a/models/config/proxy.go b/models/config/proxy.go index 39e3194..0766b5a 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -174,6 +174,13 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i if cfg.HealthCheckType == "tcp" && cfg.Plugin == "" { cfg.HealthCheckAddr = cfg.LocalIp + fmt.Sprintf(":%d", cfg.LocalPort) } + if cfg.HealthCheckType == "http" && cfg.Plugin == "" && cfg.HealthCheckUrl != "" { + s := fmt.Sprintf("http://%s:%d", cfg.LocalIp, cfg.LocalPort) + if !strings.HasPrefix(cfg.HealthCheckUrl, "/") { + s += "/" + } + cfg.HealthCheckUrl = s + cfg.HealthCheckUrl + } return nil } diff --git a/tests/ci/health/health_test.go b/tests/ci/health/health_test.go new file mode 100644 index 0000000..fd1a528 --- /dev/null +++ b/tests/ci/health/health_test.go @@ -0,0 +1,247 @@ +package health + +import ( + "net/http" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/fatedier/frp/tests/config" + "github.com/fatedier/frp/tests/consts" + "github.com/fatedier/frp/tests/mock" + "github.com/fatedier/frp/tests/util" + + "github.com/stretchr/testify/assert" +) + +const FRPS_CONF = ` +[common] +bind_addr = 0.0.0.0 +bind_port = 14000 +vhost_http_port = 14000 +log_file = console +log_level = debug +token = 123456 +` + +const FRPC_CONF = ` +[common] +server_addr = 127.0.0.1 +server_port = 14000 +log_file = console +log_level = debug +token = 123456 + +[tcp1] +type = tcp +local_port = 15001 +remote_port = 15000 +group = test +group_key = 123 +health_check_type = tcp +health_check_interval_s = 1 + +[tcp2] +type = tcp +local_port = 15002 +remote_port = 15000 +group = test +group_key = 123 +health_check_type = tcp +health_check_interval_s = 1 + +[http1] +type = http +local_port = 15003 +custom_domains = test1.com +health_check_type = http +health_check_interval_s = 1 +health_check_url = /health + +[http2] +type = http +local_port = 15004 +custom_domains = test2.com +health_check_type = http +health_check_interval_s = 1 +health_check_url = /health +` + +func TestHealthCheck(t *testing.T) { + assert := assert.New(t) + + // ****** start backgroud services ****** + echoSvc1 := mock.NewEchoServer(15001, 1, "echo1") + err := echoSvc1.Start() + if assert.NoError(err) { + defer echoSvc1.Stop() + } + + echoSvc2 := mock.NewEchoServer(15002, 1, "echo2") + err = echoSvc2.Start() + if assert.NoError(err) { + defer echoSvc2.Stop() + } + + var healthMu sync.RWMutex + svc1Health := true + svc2Health := true + httpSvc1 := mock.NewHttpServer(15003, func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "health") { + healthMu.RLock() + defer healthMu.RUnlock() + if svc1Health { + w.WriteHeader(200) + } else { + w.WriteHeader(500) + } + } else { + w.Write([]byte("http1")) + } + }) + err = httpSvc1.Start() + if assert.NoError(err) { + defer httpSvc1.Stop() + } + + httpSvc2 := mock.NewHttpServer(15004, func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "health") { + healthMu.RLock() + defer healthMu.RUnlock() + if svc2Health { + w.WriteHeader(200) + } else { + w.WriteHeader(500) + } + } else { + w.Write([]byte("http2")) + } + }) + err = httpSvc2.Start() + if assert.NoError(err) { + defer httpSvc2.Stop() + } + + time.Sleep(200 * time.Millisecond) + + // ****** start frps and frpc ****** + frpsCfgPath, err := config.GenerateConfigFile(consts.FRPS_NORMAL_CONFIG, FRPS_CONF) + if assert.NoError(err) { + defer os.Remove(frpsCfgPath) + } + + frpcCfgPath, err := config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_CONF) + if assert.NoError(err) { + defer os.Remove(frpcCfgPath) + } + + frpsProcess := util.NewProcess(consts.FRPS_SUB_BIN_PATH, []string{"-c", frpsCfgPath}) + err = frpsProcess.Start() + if assert.NoError(err) { + defer frpsProcess.Stop() + } + + time.Sleep(100 * time.Millisecond) + + frpcProcess := util.NewProcess(consts.FRPC_SUB_BIN_PATH, []string{"-c", frpcCfgPath}) + err = frpcProcess.Start() + if assert.NoError(err) { + defer frpcProcess.Stop() + } + time.Sleep(1000 * time.Millisecond) + + // ****** healcheck type tcp ****** + // echo1 and echo2 is ok + result := make([]string, 0) + res, err := util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + assert.Contains(result, "echo1") + assert.Contains(result, "echo2") + + // close echo2 server, echo1 is work + echoSvc2.Stop() + time.Sleep(1200 * time.Millisecond) + + result = make([]string, 0) + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + assert.NotContains(result, "echo2") + + // resume echo2 server, all services are ok + echoSvc2 = mock.NewEchoServer(15002, 1, "echo2") + err = echoSvc2.Start() + if assert.NoError(err) { + defer echoSvc2.Stop() + } + + time.Sleep(1200 * time.Millisecond) + + result = make([]string, 0) + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + assert.Contains(result, "echo1") + assert.Contains(result, "echo2") + + // ****** healcheck type http ****** + // http1 and http2 is ok + code, body, _, err := util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http1", body) + + code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http2", body) + + // http2 health check error + healthMu.Lock() + svc2Health = false + healthMu.Unlock() + time.Sleep(1200 * time.Millisecond) + + code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http1", body) + + code, _, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "") + assert.NoError(err) + assert.Equal(404, code) + + // resume http2 service, http1 and http2 are ok + healthMu.Lock() + svc2Health = true + healthMu.Unlock() + time.Sleep(1200 * time.Millisecond) + + code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http1", body) + + code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http2", body) +} diff --git a/tests/ci/normal_test.go b/tests/ci/normal_test.go index c528346..d76cc2d 100644 --- a/tests/ci/normal_test.go +++ b/tests/ci/normal_test.go @@ -22,13 +22,21 @@ import ( ) func TestMain(m *testing.M) { - go mock.StartTcpEchoServer(consts.TEST_TCP_PORT) - go mock.StartTcpEchoServer2(consts.TEST_TCP2_PORT) + var err error + tcpEcho1 := mock.NewEchoServer(consts.TEST_TCP_PORT, 1, "") + tcpEcho2 := mock.NewEchoServer(consts.TEST_TCP2_PORT, 2, "") + + if err = tcpEcho1.Start(); err != nil { + panic(err) + } + if err = tcpEcho2.Start(); err != nil { + panic(err) + } + go mock.StartUdpEchoServer(consts.TEST_UDP_PORT) go mock.StartUnixDomainServer(consts.TEST_UNIX_DOMAIN_ADDR) go mock.StartHttpServer(consts.TEST_HTTP_PORT) - var err error p1 := util.NewProcess(consts.FRPS_BIN_PATH, []string{"-c", "./auto_test_frps.ini"}) if err = p1.Start(); err != nil { panic(err) diff --git a/tests/ci/reconnect_test.go b/tests/ci/reconnect_test.go index 7974c2c..114567b 100644 --- a/tests/ci/reconnect_test.go +++ b/tests/ci/reconnect_test.go @@ -17,7 +17,6 @@ const FRPS_RECONNECT_CONF = ` bind_addr = 0.0.0.0 bind_port = 20000 log_file = console -# debug, info, warn, error log_level = debug token = 123456 ` @@ -27,7 +26,6 @@ const FRPC_RECONNECT_CONF = ` server_addr = 127.0.0.1 server_port = 20000 log_file = console -# debug, info, warn, error log_level = debug token = 123456 admin_port = 21000 diff --git a/tests/ci/reload_test.go b/tests/ci/reload_test.go index 9811db9..f05f2ed 100644 --- a/tests/ci/reload_test.go +++ b/tests/ci/reload_test.go @@ -84,7 +84,8 @@ func TestReload(t *testing.T) { frpcCfgPath, err := config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_RELOAD_CONF_1) if assert.NoError(err) { - defer os.Remove(frpcCfgPath) + rmFile1 := frpcCfgPath + defer os.Remove(rmFile1) } frpsProcess := util.NewProcess(consts.FRPS_BIN_PATH, []string{"-c", frpsCfgPath}) @@ -120,7 +121,10 @@ func TestReload(t *testing.T) { // reload frpc config frpcCfgPath, err = config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_RELOAD_CONF_2) - assert.NoError(err) + if assert.NoError(err) { + rmFile2 := frpcCfgPath + defer os.Remove(rmFile2) + } err = util.ReloadConf("127.0.0.1:21000", "abc", "abc") assert.NoError(err) diff --git a/tests/consts/consts.go b/tests/consts/consts.go index 60dcffe..4e1c1a0 100644 --- a/tests/consts/consts.go +++ b/tests/consts/consts.go @@ -6,6 +6,9 @@ var ( FRPS_BIN_PATH = "../../bin/frps" FRPC_BIN_PATH = "../../bin/frpc" + FRPS_SUB_BIN_PATH = "../../../bin/frps" + FRPC_SUB_BIN_PATH = "../../../bin/frpc" + FRPS_NORMAL_CONFIG = "./auto_test_frps.ini" FRPC_NORMAL_CONFIG = "./auto_test_frpc.ini" diff --git a/tests/mock/echo_server.go b/tests/mock/echo_server.go index a24947f..e029f50 100644 --- a/tests/mock/echo_server.go +++ b/tests/mock/echo_server.go @@ -10,40 +10,48 @@ import ( frpNet "github.com/fatedier/frp/utils/net" ) -func StartTcpEchoServer(port int) { - l, err := frpNet.ListenTcp("127.0.0.1", port) - if err != nil { - fmt.Printf("echo server listen error: %v\n", err) - return +type EchoServer struct { + l frpNet.Listener + + port int + repeatedNum int + specifyStr string +} + +func NewEchoServer(port int, repeatedNum int, specifyStr string) *EchoServer { + if repeatedNum <= 0 { + repeatedNum = 1 } - - for { - c, err := l.Accept() - if err != nil { - fmt.Printf("echo server accept error: %v\n", err) - return - } - - go echoWorker(c) + return &EchoServer{ + port: port, + repeatedNum: repeatedNum, + specifyStr: specifyStr, } } -func StartTcpEchoServer2(port int) { - l, err := frpNet.ListenTcp("127.0.0.1", port) +func (es *EchoServer) Start() error { + l, err := frpNet.ListenTcp("127.0.0.1", es.port) if err != nil { - fmt.Printf("echo server2 listen error: %v\n", err) - return + fmt.Printf("echo server listen error: %v\n", err) + return err } + es.l = l - for { - c, err := l.Accept() - if err != nil { - fmt.Printf("echo server2 accept error: %v\n", err) - return + go func() { + for { + c, err := l.Accept() + if err != nil { + return + } + + go echoWorker(c, es.repeatedNum, es.specifyStr) } + }() + return nil +} - go echoWorker2(c) - } +func (es *EchoServer) Stop() { + es.l.Close() } func StartUdpEchoServer(port int) { @@ -60,7 +68,7 @@ func StartUdpEchoServer(port int) { return } - go echoWorker(c) + go echoWorker(c, 1, "") } } @@ -80,11 +88,11 @@ func StartUnixDomainServer(unixPath string) { return } - go echoWorker(c) + go echoWorker(c, 1, "") } } -func echoWorker(c net.Conn) { +func echoWorker(c net.Conn, repeatedNum int, specifyStr string) { buf := make([]byte, 2048) for { @@ -99,28 +107,14 @@ func echoWorker(c net.Conn) { } } - c.Write(buf[:n]) - } -} - -func echoWorker2(c net.Conn) { - buf := make([]byte, 2048) - - for { - n, err := c.Read(buf) - if err != nil { - if err == io.EOF { - c.Close() - break - } else { - fmt.Printf("echo server read error: %v\n", err) - return + if specifyStr != "" { + c.Write([]byte(specifyStr)) + } else { + var w []byte + for i := 0; i < repeatedNum; i++ { + w = append(w, buf[:n]...) } + c.Write(w) } - - var w []byte - w = append(w, buf[:n]...) - w = append(w, buf[:n]...) - c.Write(w) } } diff --git a/tests/mock/http_server.go b/tests/mock/http_server.go index 7e97ad6..37b2b1e 100644 --- a/tests/mock/http_server.go +++ b/tests/mock/http_server.go @@ -3,6 +3,7 @@ package mock import ( "fmt" "log" + "net" "net/http" "regexp" "strings" @@ -12,6 +13,36 @@ import ( "github.com/gorilla/websocket" ) +type HttpServer struct { + l net.Listener + + port int + handler http.HandlerFunc +} + +func NewHttpServer(port int, handler http.HandlerFunc) *HttpServer { + return &HttpServer{ + port: port, + handler: handler, + } +} + +func (hs *HttpServer) Start() error { + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", hs.port)) + if err != nil { + fmt.Printf("http server listen error: %v\n", err) + return err + } + hs.l = l + + go http.Serve(l, http.HandlerFunc(hs.handler)) + return nil +} + +func (hs *HttpServer) Stop() { + hs.l.Close() +} + var upgrader = websocket.Upgrader{} func StartHttpServer(port int) { diff --git a/tests/util/process.go b/tests/util/process.go index 1e34040..e707846 100644 --- a/tests/util/process.go +++ b/tests/util/process.go @@ -1,22 +1,29 @@ package util import ( + "bytes" "context" "os/exec" ) type Process struct { - cmd *exec.Cmd - cancel context.CancelFunc + cmd *exec.Cmd + cancel context.CancelFunc + errorOutput *bytes.Buffer + + beforeStopHandler func() } func NewProcess(path string, params []string) *Process { ctx, cancel := context.WithCancel(context.Background()) cmd := exec.CommandContext(ctx, path, params...) - return &Process{ + p := &Process{ cmd: cmd, cancel: cancel, } + p.errorOutput = bytes.NewBufferString("") + cmd.Stderr = p.errorOutput + return p } func (p *Process) Start() error { @@ -24,6 +31,17 @@ func (p *Process) Start() error { } func (p *Process) Stop() error { + if p.beforeStopHandler != nil { + p.beforeStopHandler() + } p.cancel() return p.cmd.Wait() } + +func (p *Process) ErrorOutput() string { + return p.errorOutput.String() +} + +func (p *Process) SetBeforeStopHandler(fn func()) { + p.beforeStopHandler = fn +}