fix: duplicate call loginFunc (#3860) (#3875)

modify ext func, specify whether exit immediately
This commit is contained in:
im_zhou 2023-12-21 20:51:10 +08:00 committed by GitHub
parent 3540910879
commit 3bf6605e1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 19 deletions

View File

@ -239,15 +239,15 @@ func (ctl *Control) heartbeatWorker() {
// Users can still enable heartbeat feature by setting HeartbeatInterval to a positive value. // Users can still enable heartbeat feature by setting HeartbeatInterval to a positive value.
if ctl.sessionCtx.Common.Transport.HeartbeatInterval > 0 { if ctl.sessionCtx.Common.Transport.HeartbeatInterval > 0 {
// send heartbeat to server // send heartbeat to server
sendHeartBeat := func() error { sendHeartBeat := func() (bool, error) {
xl.Debug("send heartbeat to server") xl.Debug("send heartbeat to server")
pingMsg := &msg.Ping{} pingMsg := &msg.Ping{}
if err := ctl.sessionCtx.AuthSetter.SetPing(pingMsg); err != nil { if err := ctl.sessionCtx.AuthSetter.SetPing(pingMsg); err != nil {
xl.Warn("error during ping authentication: %v, skip sending ping message", err) xl.Warn("error during ping authentication: %v, skip sending ping message", err)
return err return false, err
} }
_ = ctl.msgDispatcher.Send(pingMsg) _ = ctl.msgDispatcher.Send(pingMsg)
return nil return false, nil
} }
go wait.BackoffUntil(sendHeartBeat, go wait.BackoffUntil(sendHeartBeat,

View File

@ -192,16 +192,16 @@ func (svr *Service) keepControllerWorking() {
// the control immediately exits. It is necessary to limit the frequency of reconnection in this case. // the control immediately exits. It is necessary to limit the frequency of reconnection in this case.
// The interval for the first three retries in 1 minute will be very short, and then it will increase exponentially. // The interval for the first three retries in 1 minute will be very short, and then it will increase exponentially.
// The maximum interval is 20 seconds. // The maximum interval is 20 seconds.
wait.BackoffUntil(func() error { wait.BackoffUntil(func() (bool, error) {
// loopLoginUntilSuccess is another layer of loop that will continuously attempt to // loopLoginUntilSuccess is another layer of loop that will continuously attempt to
// login to the server until successful. // login to the server until successful.
svr.loopLoginUntilSuccess(20*time.Second, false) svr.loopLoginUntilSuccess(20*time.Second, false)
if svr.ctl != nil { if svr.ctl != nil {
<-svr.ctl.Done() <-svr.ctl.Done()
return errors.New("control is closed and try another loop") return false, errors.New("control is closed and try another loop")
} }
// If the control is nil, it means that the login failed and the service is also closed. // If the control is nil, it means that the login failed and the service is also closed.
return nil return false, nil
}, wait.NewFastBackoffManager( }, wait.NewFastBackoffManager(
wait.FastBackoffOptions{ wait.FastBackoffOptions{
Duration: time.Second, Duration: time.Second,
@ -282,9 +282,8 @@ func (svr *Service) login() (conn net.Conn, connector Connector, err error) {
func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginExit bool) { func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginExit bool) {
xl := xlog.FromContextSafe(svr.ctx) xl := xlog.FromContextSafe(svr.ctx)
successCh := make(chan struct{})
loginFunc := func() error { loginFunc := func() (bool, error) {
xl.Info("try to connect to server...") xl.Info("try to connect to server...")
conn, connector, err := svr.login() conn, connector, err := svr.login()
if err != nil { if err != nil {
@ -292,7 +291,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
if firstLoginExit { if firstLoginExit {
svr.cancel(cancelErr{Err: err}) svr.cancel(cancelErr{Err: err})
} }
return err return false, err
} }
svr.cfgMu.RLock() svr.cfgMu.RLock()
@ -315,7 +314,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
if err != nil { if err != nil {
conn.Close() conn.Close()
xl.Error("NewControl error: %v", err) xl.Error("NewControl error: %v", err)
return err return false, err
} }
ctl.SetInWorkConnCallback(svr.handleWorkConnCb) ctl.SetInWorkConnCallback(svr.handleWorkConnCb)
@ -328,8 +327,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
svr.ctl = ctl svr.ctl = ctl
svr.ctlMu.Unlock() svr.ctlMu.Unlock()
close(successCh) return true, nil
return nil
} }
// try to reconnect to server until success // try to reconnect to server until success
@ -339,9 +337,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
Factor: 2, Factor: 2,
Jitter: 0.1, Jitter: 0.1,
MaxDuration: maxInterval, MaxDuration: maxInterval,
}), }), true, svr.ctx.Done())
true,
wait.MergeAndCloseOnAnyStopChannel(svr.ctx.Done(), successCh))
} }
func (svr *Service) UpdateAllConfigurer(proxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error { func (svr *Service) UpdateAllConfigurer(proxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error {

View File

@ -113,7 +113,7 @@ func (f *fastBackoffImpl) Backoff(previousDuration time.Duration, previousCondit
return f.options.Duration return f.options.Duration
} }
func BackoffUntil(f func() error, backoff BackoffManager, sliding bool, stopCh <-chan struct{}) { func BackoffUntil(f func() (bool, error), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var delay time.Duration var delay time.Duration
previousError := false previousError := false
@ -131,7 +131,9 @@ func BackoffUntil(f func() error, backoff BackoffManager, sliding bool, stopCh <
delay = backoff.Backoff(delay, previousError) delay = backoff.Backoff(delay, previousError)
} }
if err := f(); err != nil { if done, err := f(); done {
return
} else if err != nil {
previousError = true previousError = true
} else { } else {
previousError = false previousError = false
@ -170,9 +172,9 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
} }
func Until(f func(), period time.Duration, stopCh <-chan struct{}) { func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
ff := func() error { ff := func() (bool, error) {
f() f()
return nil return false, nil
} }
BackoffUntil(ff, BackoffFunc(func(time.Duration, bool) time.Duration { BackoffUntil(ff, BackoffFunc(func(time.Duration, bool) time.Duration {
return period return period