frpc: udpate proxies check and start logic

This commit is contained in:
fatedier 2018-01-26 00:23:48 +08:00
parent ce8fde793c
commit 637ddbce1f
3 changed files with 35 additions and 29 deletions

View File

@ -89,8 +89,8 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs m
ctl := &Control{ ctl := &Control{
svr: svr, svr: svr,
loginMsg: loginMsg, loginMsg: loginMsg,
sendCh: make(chan msg.Message, 10), sendCh: make(chan msg.Message, 100),
readCh: make(chan msg.Message, 10), readCh: make(chan msg.Message, 100),
closedCh: make(chan int), closedCh: make(chan int),
readerShutdown: shutdown.New(), readerShutdown: shutdown.New(),
writerShutdown: shutdown.New(), writerShutdown: shutdown.New(),
@ -98,7 +98,7 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs m
Logger: log.NewPrefixLogger(""), Logger: log.NewPrefixLogger(""),
} }
ctl.pm = NewProxyManager(ctl, ctl.sendCh, "") ctl.pm = NewProxyManager(ctl, ctl.sendCh, "")
ctl.pm.Reload(pxyCfgs, visitorCfgs) ctl.pm.Reload(pxyCfgs, visitorCfgs, false)
return ctl return ctl
} }
@ -124,7 +124,7 @@ func (ctl *Control) Run() (err error) {
// start all local visitors and send NewProxy message for all configured proxies // start all local visitors and send NewProxy message for all configured proxies
ctl.pm.Reset(ctl.sendCh, ctl.runId) ctl.pm.Reset(ctl.sendCh, ctl.runId)
ctl.pm.CheckAndStartProxy() ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew})
return nil return nil
} }
@ -360,20 +360,20 @@ func (ctl *Control) msgHandler() {
// If controler is notified by closedCh, reader and writer and handler will exit, then recall these functions. // If controler is notified by closedCh, reader and writer and handler will exit, then recall these functions.
func (ctl *Control) worker() { func (ctl *Control) worker() {
go ctl.msgHandler() go ctl.msgHandler()
go ctl.writer()
go ctl.reader() go ctl.reader()
go ctl.writer()
var err error var err error
maxDelayTime := 20 * time.Second maxDelayTime := 20 * time.Second
delayTime := time.Second delayTime := time.Second
checkInterval := 10 * time.Second checkInterval := 60 * time.Second
checkProxyTicker := time.NewTicker(checkInterval) checkProxyTicker := time.NewTicker(checkInterval)
for { for {
select { select {
case <-checkProxyTicker.C: case <-checkProxyTicker.C:
// every 10 seconds, check which proxy registered failed and reregister it to server // check which proxy registered failed and reregister it to server
ctl.pm.CheckAndStartProxy() ctl.pm.CheckAndStartProxy([]string{ProxyStatusStartErr, ProxyStatusClosed})
case _, ok := <-ctl.closedCh: case _, ok := <-ctl.closedCh:
// we won't get any variable from this channel // we won't get any variable from this channel
if !ok { if !ok {
@ -413,8 +413,8 @@ func (ctl *Control) worker() {
} }
// init related channels and variables // init related channels and variables
ctl.sendCh = make(chan msg.Message, 10) ctl.sendCh = make(chan msg.Message, 100)
ctl.readCh = make(chan msg.Message, 10) ctl.readCh = make(chan msg.Message, 100)
ctl.closedCh = make(chan int) ctl.closedCh = make(chan int)
ctl.readerShutdown = shutdown.New() ctl.readerShutdown = shutdown.New()
ctl.writerShutdown = shutdown.New() ctl.writerShutdown = shutdown.New()
@ -427,7 +427,7 @@ func (ctl *Control) worker() {
go ctl.reader() go ctl.reader()
// start all configured proxies // start all configured proxies
ctl.pm.CheckAndStartProxy() ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew})
checkProxyTicker.Stop() checkProxyTicker.Stop()
checkProxyTicker = time.NewTicker(checkInterval) checkProxyTicker = time.NewTicker(checkInterval)
@ -437,6 +437,6 @@ func (ctl *Control) worker() {
} }
func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error { func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error {
err := ctl.pm.Reload(pxyCfgs, visitorCfgs) err := ctl.pm.Reload(pxyCfgs, visitorCfgs, true)
return err return err
} }

View File

@ -69,14 +69,10 @@ func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper {
} }
} }
func (pw *ProxyWrapper) IsRunning() bool { func (pw *ProxyWrapper) GetStatusStr() string {
pw.mu.RLock() pw.mu.RLock()
defer pw.mu.RUnlock() defer pw.mu.RUnlock()
if pw.Status == ProxyStatusRunning { return pw.Status
return true
} else {
return false
}
} }
func (pw *ProxyWrapper) GetStatus() *ProxyStatus { func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
@ -210,7 +206,8 @@ func (pm *ProxyManager) CloseProxies() {
} }
} }
func (pm *ProxyManager) CheckAndStartProxy() { // pxyStatus: check and start proxies in which status
func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) {
pm.mu.RLock() pm.mu.RLock()
defer pm.mu.RUnlock() defer pm.mu.RUnlock()
if pm.closed { if pm.closed {
@ -219,7 +216,9 @@ func (pm *ProxyManager) CheckAndStartProxy() {
} }
for _, pxy := range pm.proxies { for _, pxy := range pm.proxies {
if !pxy.IsRunning() { status := pxy.GetStatusStr()
for _, s := range pxyStatus {
if status == s {
var newProxyMsg msg.NewProxy var newProxyMsg msg.NewProxy
pxy.Cfg.UnMarshalToMsg(&newProxyMsg) pxy.Cfg.UnMarshalToMsg(&newProxyMsg)
err := pm.sendMsg(&newProxyMsg) err := pm.sendMsg(&newProxyMsg)
@ -227,6 +226,8 @@ func (pm *ProxyManager) CheckAndStartProxy() {
pm.Warn("[%s] proxy send NewProxy message error") pm.Warn("[%s] proxy send NewProxy message error")
return return
} }
break
}
} }
} }
@ -245,9 +246,14 @@ func (pm *ProxyManager) CheckAndStartProxy() {
} }
} }
func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error { func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf, startNow bool) error {
pm.mu.Lock() pm.mu.Lock()
defer pm.mu.Unlock() defer func() {
pm.mu.Unlock()
if startNow {
go pm.CheckAndStartProxy([]string{ProxyStatusNew})
}
}()
if pm.closed { if pm.closed {
err := fmt.Errorf("Reload error: ProxyManager is closed now") err := fmt.Errorf("Reload error: ProxyManager is closed now")
pm.Warn(err.Error()) pm.Warn(err.Error())

View File

@ -99,7 +99,7 @@ func main() {
if args["status"] != nil { if args["status"] != nil {
if args["status"].(bool) { if args["status"].(bool) {
if err = CmdStatus(); err != nil { if err = CmdStatus(); err != nil {
fmt.Printf("frps get status error: %v\n", err) fmt.Printf("frpc get status error: %v\n", err)
os.Exit(1) os.Exit(1)
} else { } else {
os.Exit(0) os.Exit(0)