diff --git a/client/control.go b/client/control.go index 09ca2a0..4a588c5 100644 --- a/client/control.go +++ b/client/control.go @@ -37,7 +37,8 @@ type Control struct { runId string // manage all proxies - pm *ProxyManager + pxyCfgs map[string]config.ProxyConf + pm *ProxyManager // manage all visitors vm *VisitorManager @@ -76,6 +77,7 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m runId: runId, conn: conn, session: session, + pxyCfgs: pxyCfgs, sendCh: make(chan msg.Message, 100), readCh: make(chan msg.Message, 100), closedCh: make(chan struct{}), @@ -85,8 +87,8 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m msgHandlerShutdown: shutdown.New(), Logger: log.NewPrefixLogger(""), } - ctl.pm = NewProxyManager(ctl.sendCh, "") - ctl.pm.Reload(pxyCfgs, false) + ctl.pm = NewProxyManager(ctl.sendCh, runId) + ctl.vm = NewVisitorManager(ctl) ctl.vm.Reload(visitorCfgs) return ctl @@ -95,10 +97,10 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m func (ctl *Control) Run() { go ctl.worker() - // start all local visitors and send NewProxy message for all configured proxies - ctl.pm.Reset(ctl.sendCh, ctl.runId) - ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew}) + // start all proxies + ctl.pm.Reload(ctl.pxyCfgs) + // start all visitors go ctl.vm.Run() return } @@ -142,7 +144,7 @@ func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) { } func (ctl *Control) Close() error { - ctl.pm.CloseProxies() + ctl.conn.Close() return nil } @@ -275,33 +277,26 @@ func (ctl *Control) worker() { go ctl.reader() go ctl.writer() - checkInterval := 60 * time.Second - checkProxyTicker := time.NewTicker(checkInterval) + select { + case <-ctl.closedCh: + // close related channels and wait until other goroutines done + close(ctl.readCh) + ctl.readerShutdown.WaitDone() + ctl.msgHandlerShutdown.WaitDone() - for { - select { - case <-checkProxyTicker.C: - // check which proxy registered failed and reregister it to server - ctl.pm.CheckAndStartProxy([]string{ProxyStatusStartErr, ProxyStatusClosed}) - case <-ctl.closedCh: - // close related channels and wait until other goroutines done - close(ctl.readCh) - ctl.readerShutdown.WaitDone() - ctl.msgHandlerShutdown.WaitDone() + close(ctl.sendCh) + ctl.writerShutdown.WaitDone() - close(ctl.sendCh) - ctl.writerShutdown.WaitDone() + ctl.pm.Close() + ctl.vm.Close() - ctl.pm.CloseProxies() - - close(ctl.closedDoneCh) - return - } + close(ctl.closedDoneCh) + return } } func (ctl *Control) ReloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) error { ctl.vm.Reload(visitorCfgs) - err := ctl.pm.Reload(pxyCfgs, true) - return err + ctl.pm.Reload(pxyCfgs) + return nil } diff --git a/client/event.go b/client/event.go new file mode 100644 index 0000000..b10b1e4 --- /dev/null +++ b/client/event.go @@ -0,0 +1,28 @@ +package client + +import ( + "errors" + + "github.com/fatedier/frp/models/msg" +) + +type EventType int + +const ( + EvStartProxy EventType = iota + EvCloseProxy +) + +var ( + ErrPayloadType = errors.New("error payload type") +) + +type EventHandler func(evType EventType, payload interface{}) error + +type StartProxyPayload struct { + NewProxyMsg *msg.NewProxy +} + +type CloseProxyPayload struct { + CloseProxyMsg *msg.CloseProxy +} diff --git a/client/health.go b/client/health.go index 8e84a6f..7002ade 100644 --- a/client/health.go +++ b/client/health.go @@ -16,9 +16,17 @@ package client import ( "context" + "errors" + "fmt" "net" "net/http" "time" + + "github.com/fatedier/frp/utils/log" +) + +var ( + ErrHealthCheckType = errors.New("error health check type") ) type HealthCheckMonitor struct { @@ -40,6 +48,8 @@ type HealthCheckMonitor struct { ctx context.Context cancel context.CancelFunc + + l log.Logger } func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFailedTimes int, addr string, url string, @@ -70,6 +80,10 @@ func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFai } } +func (monitor *HealthCheckMonitor) SetLogger(l log.Logger) { + monitor.l = l +} + func (monitor *HealthCheckMonitor) Start() { go monitor.checkWorker() } @@ -81,7 +95,7 @@ func (monitor *HealthCheckMonitor) Stop() { func (monitor *HealthCheckMonitor) checkWorker() { for { ctx, cancel := context.WithDeadline(monitor.ctx, time.Now().Add(monitor.timeout)) - ok := monitor.doCheck(ctx) + err := monitor.doCheck(ctx) // check if this monitor has been closed select { @@ -92,14 +106,26 @@ func (monitor *HealthCheckMonitor) checkWorker() { cancel() } - if ok { + if err == nil { + if monitor.l != nil { + monitor.l.Trace("do one health check success") + } if !monitor.statusOK && monitor.statusNormalFn != nil { + if monitor.l != nil { + monitor.l.Info("health check status change to success") + } monitor.statusOK = true monitor.statusNormalFn() } } else { + if monitor.l != nil { + monitor.l.Warn("do one health check failed: %v", err) + } monitor.failedTimes++ if monitor.statusOK && int(monitor.failedTimes) >= monitor.maxFailedTimes && monitor.statusFailedFn != nil { + if monitor.l != nil { + monitor.l.Warn("health check status change to failed") + } monitor.statusOK = false monitor.statusFailedFn() } @@ -109,39 +135,44 @@ func (monitor *HealthCheckMonitor) checkWorker() { } } -func (monitor *HealthCheckMonitor) doCheck(ctx context.Context) bool { +func (monitor *HealthCheckMonitor) doCheck(ctx context.Context) error { switch monitor.checkType { case "tcp": return monitor.doTcpCheck(ctx) case "http": return monitor.doHttpCheck(ctx) default: - return false + return ErrHealthCheckType } } -func (monitor *HealthCheckMonitor) doTcpCheck(ctx context.Context) bool { +func (monitor *HealthCheckMonitor) doTcpCheck(ctx context.Context) error { + // if tcp address is not specified, always return nil + if monitor.addr == "" { + return nil + } + var d net.Dialer conn, err := d.DialContext(ctx, "tcp", monitor.addr) if err != nil { - return false + return err } conn.Close() - return true + return nil } -func (monitor *HealthCheckMonitor) doHttpCheck(ctx context.Context) bool { +func (monitor *HealthCheckMonitor) doHttpCheck(ctx context.Context) error { req, err := http.NewRequest("GET", monitor.url, nil) if err != nil { - return false + return err } resp, err := http.DefaultClient.Do(req) if err != nil { - return false + return err } if resp.StatusCode/100 != 2 { - return false + return fmt.Errorf("do http health check, StatusCode is [%d] not 2xx", resp.StatusCode) } - return true + return nil } diff --git a/client/proxy_manager.go b/client/proxy_manager.go index dc9f350..42fa9a7 100644 --- a/client/proxy_manager.go +++ b/client/proxy_manager.go @@ -12,126 +12,91 @@ import ( "github.com/fatedier/golib/errors" ) -const ( - ProxyStatusNew = "new" - ProxyStatusStartErr = "start error" - ProxyStatusWaitStart = "wait start" - ProxyStatusRunning = "running" - ProxyStatusCheckFailed = "check failed" - ProxyStatusClosed = "closed" -) - type ProxyManager struct { sendCh chan (msg.Message) proxies map[string]*ProxyWrapper - closed bool - mu sync.RWMutex + closed bool + mu sync.RWMutex + + logPrefix string log.Logger } func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager { return &ProxyManager{ - proxies: make(map[string]*ProxyWrapper), - sendCh: msgSendCh, - closed: false, - Logger: log.NewPrefixLogger(logPrefix), + proxies: make(map[string]*ProxyWrapper), + sendCh: msgSendCh, + closed: false, + logPrefix: logPrefix, + Logger: log.NewPrefixLogger(logPrefix), } } -func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) { - pm.mu.Lock() - defer pm.mu.Unlock() - pm.closed = false - pm.sendCh = msgSendCh - pm.ClearLogPrefix() - pm.AddLogPrefix(logPrefix) -} - -// Must hold the lock before calling this function. -func (pm *ProxyManager) sendMsg(m msg.Message) error { - err := errors.PanicToError(func() { - pm.sendCh <- m - }) - if err != nil { - pm.closed = true - } - return err -} - func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error { - pm.mu.Lock() - defer pm.mu.Unlock() - if pm.closed { - return fmt.Errorf("ProxyManager is closed now") - } - + pm.mu.RLock() pxy, ok := pm.proxies[name] + pm.mu.RUnlock() if !ok { - return fmt.Errorf("no proxy found") + return fmt.Errorf("proxy [%s] not found", name) } - if err := pxy.Start(remoteAddr, serverRespErr); err != nil { - errRet := err - err = pm.sendMsg(&msg.CloseProxy{ - ProxyName: name, - }) - if err != nil { - errRet = fmt.Errorf("send CloseProxy message error") - } - return errRet + err := pxy.SetRunningStatus(remoteAddr, serverRespErr) + if err != nil { + return err } return nil } -func (pm *ProxyManager) CloseProxies() { +func (pm *ProxyManager) Close() { pm.mu.RLock() defer pm.mu.RUnlock() for _, pxy := range pm.proxies { - pxy.Close() + pxy.Stop() } } -// pxyStatus: check and start proxies in which status -func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) { +func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) { + pm.mu.RLock() + pw, ok := pm.proxies[name] + pm.mu.RUnlock() + if ok { + pw.InWorkConn(workConn) + } else { + workConn.Close() + } +} + +func (pm *ProxyManager) HandleEvent(evType EventType, payload interface{}) error { + var m msg.Message + switch event := payload.(type) { + case *StartProxyPayload: + m = event.NewProxyMsg + case *CloseProxyPayload: + m = event.CloseProxyMsg + default: + return ErrPayloadType + } + + err := errors.PanicToError(func() { + pm.sendCh <- m + }) + return err +} + +func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus { + ps := make([]*ProxyStatus, 0) pm.mu.RLock() defer pm.mu.RUnlock() - if pm.closed { - pm.Warn("CheckAndStartProxy error: ProxyManager is closed now") - return - } - for _, pxy := range pm.proxies { - status := pxy.GetStatusStr() - for _, s := range pxyStatus { - if status == s { - var newProxyMsg msg.NewProxy - pxy.Cfg.MarshalToMsg(&newProxyMsg) - err := pm.sendMsg(&newProxyMsg) - if err != nil { - pm.Warn("[%s] proxy send NewProxy message error") - return - } - pxy.WaitStart() - break - } - } + ps = append(ps, pxy.GetStatus()) } + return ps } -func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, startNow bool) error { +func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) { pm.mu.Lock() - defer func() { - pm.mu.Unlock() - if startNow { - go pm.CheckAndStartProxy([]string{ProxyStatusNew}) - } - }() - if pm.closed { - err := fmt.Errorf("Reload error: ProxyManager is closed now") - pm.Warn(err.Error()) - return err - } + defer pm.mu.Unlock() delPxyNames := make([]string, 0) for name, pxy := range pm.proxies { @@ -149,163 +114,24 @@ func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, startNow boo delPxyNames = append(delPxyNames, name) delete(pm.proxies, name) - pxy.Close() - err := pm.sendMsg(&msg.CloseProxy{ - ProxyName: name, - }) - if err != nil { - err = fmt.Errorf("Reload error: ProxyManager is closed now") - pm.Warn(err.Error()) - return err - } + pxy.Stop() } } - pm.Info("proxy removed: %v", delPxyNames) + if len(delPxyNames) > 0 { + pm.Info("proxy removed: %v", delPxyNames) + } addPxyNames := make([]string, 0) for name, cfg := range pxyCfgs { if _, ok := pm.proxies[name]; !ok { - pxy := NewProxyWrapper(cfg) + pxy := NewProxyWrapper(cfg, pm.HandleEvent, pm.logPrefix) pm.proxies[name] = pxy addPxyNames = append(addPxyNames, name) + + pxy.Start() } } - pm.Info("proxy added: %v", addPxyNames) - return nil -} - -func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) { - pm.mu.RLock() - pw, ok := pm.proxies[name] - pm.mu.RUnlock() - if ok { - pw.InWorkConn(workConn) - } else { - workConn.Close() + if len(addPxyNames) > 0 { + pm.Info("proxy added: %v", addPxyNames) } } - -func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus { - ps := make([]*ProxyStatus, 0) - pm.mu.RLock() - defer pm.mu.RUnlock() - for _, pxy := range pm.proxies { - ps = append(ps, pxy.GetStatus()) - } - return ps -} - -type ProxyStatus struct { - Name string `json:"name"` - Type string `json:"type"` - Status string `json:"status"` - Err string `json:"err"` - Cfg config.ProxyConf `json:"cfg"` - - // Got from server. - RemoteAddr string `json:"remote_addr"` -} - -// ProxyWrapper is a wrapper of Proxy interface only used in ProxyManager -// Add additional proxy status info -type ProxyWrapper struct { - Name string - Type string - Status string - Err string - Cfg config.ProxyConf - - RemoteAddr string - - pxy Proxy - - mu sync.RWMutex -} - -func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper { - return &ProxyWrapper{ - Name: cfg.GetBaseInfo().ProxyName, - Type: cfg.GetBaseInfo().ProxyType, - Status: ProxyStatusNew, - Cfg: cfg, - pxy: nil, - } -} - -func (pw *ProxyWrapper) GetStatusStr() string { - pw.mu.RLock() - defer pw.mu.RUnlock() - return pw.Status -} - -func (pw *ProxyWrapper) GetStatus() *ProxyStatus { - pw.mu.RLock() - defer pw.mu.RUnlock() - ps := &ProxyStatus{ - Name: pw.Name, - Type: pw.Type, - Status: pw.Status, - Err: pw.Err, - Cfg: pw.Cfg, - RemoteAddr: pw.RemoteAddr, - } - return ps -} - -func (pw *ProxyWrapper) WaitStart() { - pw.mu.Lock() - defer pw.mu.Unlock() - pw.Status = ProxyStatusWaitStart -} - -func (pw *ProxyWrapper) Start(remoteAddr string, serverRespErr string) error { - if pw.pxy != nil { - pw.pxy.Close() - pw.pxy = nil - } - - if serverRespErr != "" { - pw.mu.Lock() - pw.Status = ProxyStatusStartErr - pw.RemoteAddr = remoteAddr - pw.Err = serverRespErr - pw.mu.Unlock() - return fmt.Errorf(serverRespErr) - } - - pxy := NewProxy(pw.Cfg) - pw.mu.Lock() - defer pw.mu.Unlock() - pw.RemoteAddr = remoteAddr - if err := pxy.Run(); err != nil { - pw.Status = ProxyStatusStartErr - pw.Err = err.Error() - return err - } - pw.Status = ProxyStatusRunning - pw.Err = "" - pw.pxy = pxy - return nil -} - -func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) { - pw.mu.RLock() - pxy := pw.pxy - pw.mu.RUnlock() - if pxy != nil { - workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) - go pxy.InWorkConn(workConn) - } else { - workConn.Close() - } -} - -func (pw *ProxyWrapper) Close() { - pw.mu.Lock() - defer pw.mu.Unlock() - if pw.pxy != nil { - pw.pxy.Close() - pw.pxy = nil - } - pw.Status = ProxyStatusClosed -} diff --git a/client/proxy_wrapper.go b/client/proxy_wrapper.go new file mode 100644 index 0000000..a19d97b --- /dev/null +++ b/client/proxy_wrapper.go @@ -0,0 +1,219 @@ +package client + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/utils/log" + frpNet "github.com/fatedier/frp/utils/net" +) + +const ( + ProxyStatusNew = "new" + ProxyStatusWaitStart = "wait start" + ProxyStatusStartErr = "start error" + ProxyStatusRunning = "running" + ProxyStatusCheckFailed = "check failed" + ProxyStatusClosed = "closed" +) + +var ( + statusCheckInterval time.Duration = 3 * time.Second + waitResponseTimeout = 20 * time.Second + startErrTimeout = 30 * time.Second +) + +type ProxyStatus struct { + Name string `json:"name"` + Type string `json:"type"` + Status string `json:"status"` + Err string `json:"err"` + Cfg config.ProxyConf `json:"cfg"` + + // Got from server. + RemoteAddr string `json:"remote_addr"` +} + +type ProxyWrapper struct { + ProxyStatus + + // underlying proxy + pxy Proxy + + // if ProxyConf has healcheck config + // monitor will watch if it is alive + monitor *HealthCheckMonitor + + // event handler + handler EventHandler + + health uint32 + lastSendStartMsg time.Time + lastStartErr time.Time + closeCh chan struct{} + mu sync.RWMutex + + log.Logger +} + +func NewProxyWrapper(cfg config.ProxyConf, eventHandler EventHandler, logPrefix string) *ProxyWrapper { + baseInfo := cfg.GetBaseInfo() + pw := &ProxyWrapper{ + ProxyStatus: ProxyStatus{ + Name: baseInfo.ProxyName, + Type: baseInfo.ProxyType, + Status: ProxyStatusNew, + Cfg: cfg, + }, + closeCh: make(chan struct{}), + handler: eventHandler, + Logger: log.NewPrefixLogger(logPrefix), + } + pw.AddLogPrefix(pw.Name) + + if baseInfo.HealthCheckType != "" { + pw.health = 1 // means failed + pw.monitor = NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS, + baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr, + baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback) + pw.monitor.SetLogger(pw.Logger) + pw.Trace("enable health check monitor") + } + + pw.pxy = NewProxy(pw.Cfg) + return pw +} + +func (pw *ProxyWrapper) SetRunningStatus(remoteAddr string, respErr string) error { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.Status != ProxyStatusWaitStart { + return fmt.Errorf("status not wait start, ignore start message") + } + + pw.RemoteAddr = remoteAddr + if respErr != "" { + pw.Status = ProxyStatusStartErr + pw.Err = respErr + pw.lastStartErr = time.Now() + return fmt.Errorf(pw.Err) + } + + if err := pw.pxy.Run(); err != nil { + pw.Status = ProxyStatusStartErr + pw.Err = err.Error() + pw.lastStartErr = time.Now() + return err + } + + pw.Status = ProxyStatusRunning + pw.Err = "" + return nil +} + +func (pw *ProxyWrapper) Start() { + go pw.checkWorker() + if pw.monitor != nil { + go pw.monitor.Start() + } +} + +func (pw *ProxyWrapper) Stop() { + pw.mu.Lock() + defer pw.mu.Unlock() + pw.pxy.Close() + if pw.monitor != nil { + pw.monitor.Stop() + } + pw.Status = ProxyStatusClosed + + pw.handler(EvCloseProxy, &CloseProxyPayload{ + CloseProxyMsg: &msg.CloseProxy{ + ProxyName: pw.Name, + }, + }) +} + +func (pw *ProxyWrapper) checkWorker() { + for { + // check proxy status + now := time.Now() + if atomic.LoadUint32(&pw.health) == 0 { + pw.mu.Lock() + if pw.Status == ProxyStatusNew || + pw.Status == ProxyStatusCheckFailed || + (pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) || + (pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) { + + pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart) + pw.Status = ProxyStatusWaitStart + + var newProxyMsg msg.NewProxy + pw.Cfg.MarshalToMsg(&newProxyMsg) + pw.lastSendStartMsg = now + pw.handler(EvStartProxy, &StartProxyPayload{ + NewProxyMsg: &newProxyMsg, + }) + } + pw.mu.Unlock() + } else { + pw.mu.Lock() + if pw.Status == ProxyStatusRunning || pw.Status == ProxyStatusWaitStart { + pw.handler(EvCloseProxy, &CloseProxyPayload{ + CloseProxyMsg: &msg.CloseProxy{ + ProxyName: pw.Name, + }, + }) + pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed) + pw.Status = ProxyStatusCheckFailed + } + pw.mu.Unlock() + } + + select { + case <-pw.closeCh: + return + case <-time.After(statusCheckInterval): + } + } +} + +func (pw *ProxyWrapper) statusNormalCallback() { + atomic.StoreUint32(&pw.health, 0) + pw.Info("health check success") +} + +func (pw *ProxyWrapper) statusFailedCallback() { + atomic.StoreUint32(&pw.health, 1) + pw.Info("health check failed") +} + +func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) { + pw.mu.RLock() + pxy := pw.pxy + pw.mu.RUnlock() + if pxy != nil { + workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) + go pxy.InWorkConn(workConn) + } else { + workConn.Close() + } +} + +func (pw *ProxyWrapper) GetStatus() *ProxyStatus { + pw.mu.RLock() + defer pw.mu.RUnlock() + ps := &ProxyStatus{ + Name: pw.Name, + Type: pw.Type, + Status: pw.Status, + Err: pw.Err, + Cfg: pw.Cfg, + RemoteAddr: pw.RemoteAddr, + } + return ps +} diff --git a/client/visitor_manager.go b/client/visitor_manager.go index 3e0aa80..b223d55 100644 --- a/client/visitor_manager.go +++ b/client/visitor_manager.go @@ -96,7 +96,9 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) { delete(vm.visitors, name) } } - log.Info("visitor removed: %v", delNames) + if len(delNames) > 0 { + log.Info("visitor removed: %v", delNames) + } addNames := make([]string, 0) for name, cfg := range cfgs { @@ -106,6 +108,16 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) { vm.startVisitor(cfg) } } - log.Info("visitor added: %v", addNames) + if len(addNames) > 0 { + log.Info("visitor added: %v", addNames) + } return } + +func (vm *VisitorManager) Close() { + vm.mu.Lock() + defer vm.mu.Unlock() + for _, v := range vm.visitors { + v.Close() + } +} diff --git a/models/config/proxy.go b/models/config/proxy.go index 9ea680a..39e3194 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -170,6 +170,10 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i if err := cfg.HealthCheckConf.UnmarshalFromIni(prefix, name, section); err != nil { return err } + + if cfg.HealthCheckType == "tcp" && cfg.Plugin == "" { + cfg.HealthCheckAddr = cfg.LocalIp + fmt.Sprintf(":%d", cfg.LocalPort) + } return nil } @@ -381,7 +385,7 @@ func (cfg *LocalSvrConf) checkForCli() (err error) { // Health check info type HealthCheckConf struct { HealthCheckType string `json:"health_check_type"` // tcp | http - HealthCheckTimeout int `json:"health_check_timeout"` + HealthCheckTimeoutS int `json:"health_check_timeout_s"` HealthCheckMaxFailed int `json:"health_check_max_failed"` HealthCheckIntervalS int `json:"health_check_interval_s"` HealthCheckUrl string `json:"health_check_url"` @@ -392,8 +396,10 @@ type HealthCheckConf struct { func (cfg *HealthCheckConf) compare(cmp *HealthCheckConf) bool { if cfg.HealthCheckType != cmp.HealthCheckType || - cfg.HealthCheckUrl != cmp.HealthCheckUrl || - cfg.HealthCheckIntervalS != cmp.HealthCheckIntervalS { + cfg.HealthCheckTimeoutS != cmp.HealthCheckTimeoutS || + cfg.HealthCheckMaxFailed != cmp.HealthCheckMaxFailed || + cfg.HealthCheckIntervalS != cmp.HealthCheckIntervalS || + cfg.HealthCheckUrl != cmp.HealthCheckUrl { return false } return true @@ -403,6 +409,18 @@ func (cfg *HealthCheckConf) UnmarshalFromIni(prefix string, name string, section cfg.HealthCheckType = section["health_check_type"] cfg.HealthCheckUrl = section["health_check_url"] + if tmpStr, ok := section["health_check_timeout_s"]; ok { + if cfg.HealthCheckTimeoutS, err = strconv.Atoi(tmpStr); err != nil { + return fmt.Errorf("Parse conf error: proxy [%s] health_check_timeout_s error", name) + } + } + + if tmpStr, ok := section["health_check_max_failed"]; ok { + if cfg.HealthCheckMaxFailed, err = strconv.Atoi(tmpStr); err != nil { + return fmt.Errorf("Parse conf error: proxy [%s] health_check_max_failed error", name) + } + } + if tmpStr, ok := section["health_check_interval_s"]; ok { if cfg.HealthCheckIntervalS, err = strconv.Atoi(tmpStr); err != nil { return fmt.Errorf("Parse conf error: proxy [%s] health_check_interval_s error", name) @@ -419,9 +437,6 @@ func (cfg *HealthCheckConf) checkForCli() error { if cfg.HealthCheckType == "http" && cfg.HealthCheckUrl == "" { return fmt.Errorf("health_check_url is required for health check type 'http'") } - if cfg.HealthCheckIntervalS <= 0 { - return fmt.Errorf("health_check_interval_s is required and should greater than 0") - } } return nil } diff --git a/server/group/group.go b/server/group/group.go index 859239e..a0dae7c 100644 --- a/server/group/group.go +++ b/server/group/group.go @@ -22,4 +22,5 @@ var ( ErrGroupAuthFailed = errors.New("group auth failed") ErrGroupParamsInvalid = errors.New("group params invalid") ErrListenerClosed = errors.New("group listener closed") + ErrGroupDifferentPort = errors.New("group should have same remote port") ) diff --git a/server/group/tcp.go b/server/group/tcp.go index 2de05b4..8c46be6 100644 --- a/server/group/tcp.go +++ b/server/group/tcp.go @@ -114,10 +114,14 @@ func (tg *TcpGroup) Listen(proxyName string, group string, groupKey string, addr } go tg.worker() } else { - if tg.group != group || tg.addr != addr || tg.port != port { + if tg.group != group || tg.addr != addr { err = ErrGroupParamsInvalid return } + if tg.port != port { + err = ErrGroupDifferentPort + return + } if tg.groupKey != groupKey { err = ErrGroupAuthFailed return