From 637ddbce1f23bae495a2aba0b3d6b51dbc3fbbaa Mon Sep 17 00:00:00 2001 From: fatedier Date: Fri, 26 Jan 2018 00:23:48 +0800 Subject: [PATCH] frpc: udpate proxies check and start logic --- client/control.go | 24 ++++++++++++------------ client/proxy_manager.go | 38 ++++++++++++++++++++++---------------- cmd/frpc/main.go | 2 +- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/client/control.go b/client/control.go index a4bb9e0..a9c4511 100644 --- a/client/control.go +++ b/client/control.go @@ -89,8 +89,8 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs m ctl := &Control{ svr: svr, loginMsg: loginMsg, - sendCh: make(chan msg.Message, 10), - readCh: make(chan msg.Message, 10), + sendCh: make(chan msg.Message, 100), + readCh: make(chan msg.Message, 100), closedCh: make(chan int), readerShutdown: shutdown.New(), writerShutdown: shutdown.New(), @@ -98,7 +98,7 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs m Logger: log.NewPrefixLogger(""), } ctl.pm = NewProxyManager(ctl, ctl.sendCh, "") - ctl.pm.Reload(pxyCfgs, visitorCfgs) + ctl.pm.Reload(pxyCfgs, visitorCfgs, false) return ctl } @@ -124,7 +124,7 @@ func (ctl *Control) Run() (err error) { // start all local visitors and send NewProxy message for all configured proxies ctl.pm.Reset(ctl.sendCh, ctl.runId) - ctl.pm.CheckAndStartProxy() + ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew}) return nil } @@ -360,20 +360,20 @@ func (ctl *Control) msgHandler() { // If controler is notified by closedCh, reader and writer and handler will exit, then recall these functions. func (ctl *Control) worker() { go ctl.msgHandler() - go ctl.writer() go ctl.reader() + go ctl.writer() var err error maxDelayTime := 20 * time.Second delayTime := time.Second - checkInterval := 10 * time.Second + checkInterval := 60 * time.Second checkProxyTicker := time.NewTicker(checkInterval) for { select { case <-checkProxyTicker.C: - // every 10 seconds, check which proxy registered failed and reregister it to server - ctl.pm.CheckAndStartProxy() + // check which proxy registered failed and reregister it to server + ctl.pm.CheckAndStartProxy([]string{ProxyStatusStartErr, ProxyStatusClosed}) case _, ok := <-ctl.closedCh: // we won't get any variable from this channel if !ok { @@ -413,8 +413,8 @@ func (ctl *Control) worker() { } // init related channels and variables - ctl.sendCh = make(chan msg.Message, 10) - ctl.readCh = make(chan msg.Message, 10) + ctl.sendCh = make(chan msg.Message, 100) + ctl.readCh = make(chan msg.Message, 100) ctl.closedCh = make(chan int) ctl.readerShutdown = shutdown.New() ctl.writerShutdown = shutdown.New() @@ -427,7 +427,7 @@ func (ctl *Control) worker() { go ctl.reader() // start all configured proxies - ctl.pm.CheckAndStartProxy() + ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew}) checkProxyTicker.Stop() checkProxyTicker = time.NewTicker(checkInterval) @@ -437,6 +437,6 @@ func (ctl *Control) worker() { } func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error { - err := ctl.pm.Reload(pxyCfgs, visitorCfgs) + err := ctl.pm.Reload(pxyCfgs, visitorCfgs, true) return err } diff --git a/client/proxy_manager.go b/client/proxy_manager.go index 7782398..6f1e32c 100644 --- a/client/proxy_manager.go +++ b/client/proxy_manager.go @@ -69,14 +69,10 @@ func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper { } } -func (pw *ProxyWrapper) IsRunning() bool { +func (pw *ProxyWrapper) GetStatusStr() string { pw.mu.RLock() defer pw.mu.RUnlock() - if pw.Status == ProxyStatusRunning { - return true - } else { - return false - } + return pw.Status } func (pw *ProxyWrapper) GetStatus() *ProxyStatus { @@ -210,7 +206,8 @@ func (pm *ProxyManager) CloseProxies() { } } -func (pm *ProxyManager) CheckAndStartProxy() { +// pxyStatus: check and start proxies in which status +func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) { pm.mu.RLock() defer pm.mu.RUnlock() if pm.closed { @@ -219,13 +216,17 @@ func (pm *ProxyManager) CheckAndStartProxy() { } for _, pxy := range pm.proxies { - if !pxy.IsRunning() { - var newProxyMsg msg.NewProxy - pxy.Cfg.UnMarshalToMsg(&newProxyMsg) - err := pm.sendMsg(&newProxyMsg) - if err != nil { - pm.Warn("[%s] proxy send NewProxy message error") - return + status := pxy.GetStatusStr() + for _, s := range pxyStatus { + if status == s { + var newProxyMsg msg.NewProxy + pxy.Cfg.UnMarshalToMsg(&newProxyMsg) + err := pm.sendMsg(&newProxyMsg) + if err != nil { + pm.Warn("[%s] proxy send NewProxy message error") + return + } + break } } } @@ -245,9 +246,14 @@ func (pm *ProxyManager) CheckAndStartProxy() { } } -func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error { +func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf, startNow bool) error { pm.mu.Lock() - defer pm.mu.Unlock() + defer func() { + pm.mu.Unlock() + if startNow { + go pm.CheckAndStartProxy([]string{ProxyStatusNew}) + } + }() if pm.closed { err := fmt.Errorf("Reload error: ProxyManager is closed now") pm.Warn(err.Error()) diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index 234d9e7..51eb53c 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -99,7 +99,7 @@ func main() { if args["status"] != nil { if args["status"].(bool) { if err = CmdStatus(); err != nil { - fmt.Printf("frps get status error: %v\n", err) + fmt.Printf("frpc get status error: %v\n", err) os.Exit(1) } else { os.Exit(0)