fix a goroutine leak issue caused by Login plugin timeout (#3547)

This commit is contained in:
fatedier 2023-07-25 15:12:40 +08:00 committed by GitHub
parent 3235addaaa
commit 6430afcfa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 12 deletions

View File

@ -1,3 +1,7 @@
### Features ### Features
* Adds a `completion` command for shell completions. * Adds a `completion` command for shell completions.
### Fixes
* fix a goroutine leak issue caused by Login plugin timeout.

View File

@ -29,7 +29,6 @@ import (
"github.com/fatedier/frp/pkg/auth" "github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/consts"
pkgerr "github.com/fatedier/frp/pkg/errors" pkgerr "github.com/fatedier/frp/pkg/errors"
"github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server" plugin "github.com/fatedier/frp/pkg/plugin/server"
@ -55,13 +54,14 @@ func NewControlManager() *ControlManager {
} }
} }
func (cm *ControlManager) Add(runID string, ctl *Control) (oldCtl *Control) { func (cm *ControlManager) Add(runID string, ctl *Control) (old *Control) {
cm.mu.Lock() cm.mu.Lock()
defer cm.mu.Unlock() defer cm.mu.Unlock()
oldCtl, ok := cm.ctlsByRunID[runID] var ok bool
old, ok = cm.ctlsByRunID[runID]
if ok { if ok {
oldCtl.Replaced(ctl) old.Replaced(ctl)
} }
cm.ctlsByRunID[runID] = ctl cm.ctlsByRunID[runID] = ctl
return return
@ -141,14 +141,13 @@ type Control struct {
// replace old controller instantly. // replace old controller instantly.
runID string runID string
// control status
status string
readerShutdown *shutdown.Shutdown readerShutdown *shutdown.Shutdown
writerShutdown *shutdown.Shutdown writerShutdown *shutdown.Shutdown
managerShutdown *shutdown.Shutdown managerShutdown *shutdown.Shutdown
allShutdown *shutdown.Shutdown allShutdown *shutdown.Shutdown
started bool
mu sync.RWMutex mu sync.RWMutex
// Server configuration information // Server configuration information
@ -187,7 +186,6 @@ func NewControl(
portsUsedNum: 0, portsUsedNum: 0,
lastPing: time.Now(), lastPing: time.Now(),
runID: loginMsg.RunID, runID: loginMsg.RunID,
status: consts.Working,
readerShutdown: shutdown.New(), readerShutdown: shutdown.New(),
writerShutdown: shutdown.New(), writerShutdown: shutdown.New(),
managerShutdown: shutdown.New(), managerShutdown: shutdown.New(),
@ -208,11 +206,19 @@ func (ctl *Control) Start() {
Error: "", Error: "",
} }
_ = msg.WriteMsg(ctl.conn, loginRespMsg) _ = msg.WriteMsg(ctl.conn, loginRespMsg)
ctl.mu.Lock()
ctl.started = true
ctl.mu.Unlock()
go ctl.writer() go ctl.writer()
for i := 0; i < ctl.poolCount; i++ { go func() {
ctl.sendCh <- &msg.ReqWorkConn{} for i := 0; i < ctl.poolCount; i++ {
} // ignore error here, that means that this control is closed
_ = errors.PanicToError(func() {
ctl.sendCh <- &msg.ReqWorkConn{}
})
}
}()
go ctl.manager() go ctl.manager()
go ctl.reader() go ctl.reader()
@ -418,6 +424,14 @@ func (ctl *Control) stoper() {
// block until Control closed // block until Control closed
func (ctl *Control) WaitClosed() { func (ctl *Control) WaitClosed() {
ctl.mu.RLock()
started := ctl.started
ctl.mu.RUnlock()
if !started {
ctl.allShutdown.Done()
return
}
ctl.allShutdown.WaitDone() ctl.allShutdown.WaitDone()
} }

View File

@ -552,7 +552,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
ctl := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, svr.authVerifier, ctlConn, loginMsg, svr.cfg) ctl := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, svr.authVerifier, ctlConn, loginMsg, svr.cfg)
if oldCtl := svr.ctlManager.Add(loginMsg.RunID, ctl); oldCtl != nil { if oldCtl := svr.ctlManager.Add(loginMsg.RunID, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDone() oldCtl.WaitClosed()
} }
ctl.Start() ctl.Start()