From a2f1897ab479aaf4d5c7af3480deca5e281874a0 Mon Sep 17 00:00:00 2001 From: Hurricanezwf <1094646850@qq.com> Date: Fri, 26 Feb 2016 10:52:22 +0800 Subject: [PATCH] add heartbeat echo from server to client --- src/frp/cmd/frpc/control.go | 56 +++++++++++++++++++++++++++------ src/frp/cmd/frps/control.go | 29 ++++++++++++++--- src/frp/models/client/config.go | 1 + src/frp/models/consts/consts.go | 10 ++++++ 4 files changed, 83 insertions(+), 13 deletions(-) diff --git a/src/frp/cmd/frpc/control.go b/src/frp/cmd/frpc/control.go index 4cc3eca..799e8d7 100644 --- a/src/frp/cmd/frpc/control.go +++ b/src/frp/cmd/frpc/control.go @@ -14,6 +14,9 @@ import ( "frp/utils/log" ) +var connection *conn.Conn = nil +var heartBeatTimer *time.Timer = nil + func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() @@ -22,12 +25,13 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { log.Error("ProxyName [%s], connect to server failed!", cli.Name) return } - defer c.Close() + connection = c + defer connection.Close() for { // ignore response content now - _, err := c.ReadLine() - if err == io.EOF { + content, err := connection.ReadLine() + if err == io.EOF || nil == connection || connection.IsClosed() { log.Debug("ProxyName [%s], server close this control conn", cli.Name) var sleepTime time.Duration = 1 @@ -36,8 +40,8 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort) tmpConn, err := loginToServer(cli) if err == nil { - c.Close() - c = tmpConn + connection.Close() + connection = tmpConn break } @@ -52,6 +56,21 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { continue } + clientCtlRes := &msg.ClientCtlRes{} + if err := json.Unmarshal([]byte(content), clientCtlRes); err != nil { + log.Warn("Parse err: %v : %s", err, content) + continue + } + if consts.SCHeartBeatRes == clientCtlRes.GeneralRes.Code { + if heartBeatTimer != nil { + log.Debug("Client rcv heartbeat response") + heartBeatTimer.Reset(time.Duration(client.HeartBeatTimeout) * time.Second) + } else { + log.Error("heartBeatTimer is nil") + } + continue + } + cli.StartTunnel(client.ServerAddr, client.ServerPort) } } @@ -100,18 +119,37 @@ func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) { } func startHeartBeat(c *conn.Conn) { + f := func() { + log.Error("HeartBeat timeout!") + if c != nil { + c.Close() + } + } + heartBeatTimer = time.AfterFunc(time.Duration(client.HeartBeatTimeout)*time.Second, f) + defer heartBeatTimer.Stop() + + clientCtlReq := &msg.ClientCtlReq{ + Type: consts.CSHeartBeatReq, + ProxyName: "", + Passwd: "", + } + request, err := json.Marshal(clientCtlReq) + if err != nil { + log.Warn("Serialize clientCtlReq err! Err: %v", err) + } + log.Debug("Start to send heartbeat") for { time.Sleep(time.Duration(client.HeartBeatInterval) * time.Second) - if !c.IsClosed() { - err := c.Write("\n") + if c != nil && !c.IsClosed() { + err = c.Write(string(request) + "\n") if err != nil { - log.Error("Send hearbeat to server failed! Err:%s", err.Error()) + log.Error("Send hearbeat to server failed! Err:%v", err) continue } } else { break } } - log.Debug("heartbeat exit") + log.Debug("Heartbeat exit") } diff --git a/src/frp/cmd/frps/control.go b/src/frp/cmd/frps/control.go index 6154054..06203aa 100644 --- a/src/frp/cmd/frps/control.go +++ b/src/frp/cmd/frps/control.go @@ -162,13 +162,13 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { defer timer.Stop() for isContinueRead { - _, err := c.ReadLine() + content, err := c.ReadLine() if err != nil { if err == io.EOF { log.Warn("ProxyName [%s], client is dead!", s.Name) s.Close() break - } else if c.IsClosed() { + } else if nil == c || c.IsClosed() { log.Warn("ProxyName [%s], client connection is closed", s.Name) break } @@ -176,8 +176,29 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { log.Error("ProxyName [%s], read error: %v", s.Name, err) continue } - log.Debug("ProxyName [%s], get heartbeat", s.Name) - timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) + clientCtlReq := &msg.ClientCtlReq{} + if err := json.Unmarshal([]byte(content), clientCtlReq); err != nil { + log.Warn("Parse err: %v : %s", err, content) + continue + } + if consts.CSHeartBeatReq == clientCtlReq.Type { + log.Debug("ProxyName [%s], get heartbeat", s.Name) + timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) + + clientCtlRes := &msg.ClientCtlRes{} + clientCtlRes.GeneralRes.Code = consts.SCHeartBeatRes + response, err := json.Marshal(clientCtlRes) + if err != nil { + log.Warn("Serialize ClientCtlRes err! err: %v", err) + continue + } + + err = c.Write(string(response) + "\n") + if err != nil { + log.Error("Send heartbeat response to client failed! Err:%v", err) + continue + } + } } } diff --git a/src/frp/models/client/config.go b/src/frp/models/client/config.go index 063b216..f82e6ae 100644 --- a/src/frp/models/client/config.go +++ b/src/frp/models/client/config.go @@ -15,6 +15,7 @@ var ( LogLevel string = "warn" LogWay string = "file" HeartBeatInterval int64 = 5 + HeartBeatTimeout int64 = 30 ) var ProxyClients map[string]*ProxyClient = make(map[string]*ProxyClient) diff --git a/src/frp/models/consts/consts.go b/src/frp/models/consts/consts.go index 51dfe20..d48332a 100644 --- a/src/frp/models/consts/consts.go +++ b/src/frp/models/consts/consts.go @@ -11,3 +11,13 @@ const ( CtlConn = iota WorkConn ) + +// msg from client to server +const ( + CSHeartBeatReq = 1 +) + +// msg from server to client +const ( + SCHeartBeatRes = 100 +)