update mutex used in frpc control

This commit is contained in:
fatedier 2017-06-27 23:30:09 +08:00
parent aede4e54f8
commit b55a24a27e
1 changed files with 42 additions and 21 deletions

View File

@ -137,13 +137,6 @@ func (ctl *Control) Run() (err error) {
go ctl.writer() go ctl.writer()
go ctl.reader() go ctl.reader()
// send NewProxy message for all configured proxies
for _, cfg := range ctl.pxyCfgs {
var newProxyMsg msg.NewProxy
cfg.UnMarshalToMsg(&newProxyMsg)
ctl.sendCh <- &newProxyMsg
}
// start all local vistors // start all local vistors
for _, cfg := range ctl.vistorCfgs { for _, cfg := range ctl.vistorCfgs {
vistor := NewVistor(ctl, cfg) vistor := NewVistor(ctl, cfg)
@ -155,6 +148,13 @@ func (ctl *Control) Run() (err error) {
ctl.vistors[cfg.GetName()] = vistor ctl.vistors[cfg.GetName()] = vistor
vistor.Info("start vistor success") vistor.Info("start vistor success")
} }
// send NewProxy message for all configured proxies
for _, cfg := range ctl.pxyCfgs {
var newProxyMsg msg.NewProxy
cfg.UnMarshalToMsg(&newProxyMsg)
ctl.sendCh <- &newProxyMsg
}
return nil return nil
} }
@ -165,7 +165,7 @@ func (ctl *Control) NewWorkConn() {
} }
m := &msg.NewWorkConn{ m := &msg.NewWorkConn{
RunId: ctl.runId, RunId: ctl.getRunId(),
} }
if err = msg.WriteMsg(workConn, m); err != nil { if err = msg.WriteMsg(workConn, m); err != nil {
ctl.Warn("work connection write to server error: %v", err) ctl.Warn("work connection write to server error: %v", err)
@ -182,9 +182,7 @@ func (ctl *Control) NewWorkConn() {
workConn.AddLogPrefix(startMsg.ProxyName) workConn.AddLogPrefix(startMsg.ProxyName)
// dispatch this work connection to related proxy // dispatch this work connection to related proxy
ctl.mu.RLock() pxy, ok := ctl.getProxy(startMsg.ProxyName)
pxy, ok := ctl.proxies[startMsg.ProxyName]
ctl.mu.RUnlock()
if ok { if ok {
workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
go pxy.InWorkConn(workConn) go pxy.InWorkConn(workConn)
@ -251,7 +249,7 @@ func (ctl *Control) login() (err error) {
now := time.Now().Unix() now := time.Now().Unix()
ctl.loginMsg.PrivilegeKey = util.GetAuthKey(config.ClientCommonCfg.PrivilegeToken, now) ctl.loginMsg.PrivilegeKey = util.GetAuthKey(config.ClientCommonCfg.PrivilegeToken, now)
ctl.loginMsg.Timestamp = now ctl.loginMsg.Timestamp = now
ctl.loginMsg.RunId = ctl.runId ctl.loginMsg.RunId = ctl.getRunId()
if err = msg.WriteMsg(conn, ctl.loginMsg); err != nil { if err = msg.WriteMsg(conn, ctl.loginMsg); err != nil {
return err return err
@ -272,7 +270,7 @@ func (ctl *Control) login() (err error) {
ctl.conn = conn ctl.conn = conn
// update runId got from server // update runId got from server
ctl.runId = loginRespMsg.RunId ctl.setRunId(loginRespMsg.RunId)
ctl.ClearLogPrefix() ctl.ClearLogPrefix()
ctl.AddLogPrefix(loginRespMsg.RunId) ctl.AddLogPrefix(loginRespMsg.RunId)
ctl.Info("login to server success, get run id [%s]", loginRespMsg.RunId) ctl.Info("login to server success, get run id [%s]", loginRespMsg.RunId)
@ -349,6 +347,7 @@ func (ctl *Control) writer() {
} }
} }
// manager handles all channel events and do corresponding process
func (ctl *Control) manager() { func (ctl *Control) manager() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
@ -396,9 +395,7 @@ func (ctl *Control) manager() {
continue continue
} }
ctl.mu.RLock() oldPxy, ok := ctl.getProxy(m.ProxyName)
oldPxy, ok := ctl.proxies[m.ProxyName]
ctl.mu.RUnlock()
if ok { if ok {
oldPxy.Close() oldPxy.Close()
} }
@ -410,9 +407,7 @@ func (ctl *Control) manager() {
} }
continue continue
} }
ctl.mu.Lock() ctl.addProxy(m.ProxyName, pxy)
ctl.proxies[m.ProxyName] = pxy
ctl.mu.Unlock()
ctl.Info("[%s] start proxy success", m.ProxyName) ctl.Info("[%s] start proxy success", m.ProxyName)
case *msg.Pong: case *msg.Pong:
ctl.lastPong = time.Now() ctl.lastPong = time.Now()
@ -422,7 +417,8 @@ func (ctl *Control) manager() {
} }
} }
// control keep watching closedCh, start a new connection if previous control connection is closed // controler keep watching closedCh, start a new connection if previous control connection is closed.
// If controler is notified by closedCh, reader and writer and manager will exit, then recall these functions.
func (ctl *Control) controler() { func (ctl *Control) controler() {
var err error var err error
maxDelayTime := 30 * time.Second maxDelayTime := 30 * time.Second
@ -435,7 +431,7 @@ func (ctl *Control) controler() {
case <-checkProxyTicker.C: case <-checkProxyTicker.C:
// Every 30 seconds, check which proxy registered failed and reregister it to server. // Every 30 seconds, check which proxy registered failed and reregister it to server.
for _, cfg := range ctl.pxyCfgs { for _, cfg := range ctl.pxyCfgs {
if _, exist := ctl.proxies[cfg.GetName()]; !exist { if _, exist := ctl.getProxy(cfg.GetName()); !exist {
ctl.Info("try to reregister proxy [%s]", cfg.GetName()) ctl.Info("try to reregister proxy [%s]", cfg.GetName())
var newProxyMsg msg.NewProxy var newProxyMsg msg.NewProxy
cfg.UnMarshalToMsg(&newProxyMsg) cfg.UnMarshalToMsg(&newProxyMsg)
@ -501,3 +497,28 @@ func (ctl *Control) controler() {
} }
} }
} }
func (ctl *Control) setRunId(runId string) {
ctl.mu.Lock()
defer ctl.mu.Unlock()
ctl.runId = runId
}
func (ctl *Control) getRunId() string {
ctl.mu.RLock()
defer ctl.mu.RUnlock()
return ctl.runId
}
func (ctl *Control) getProxy(name string) (pxy Proxy, ok bool) {
ctl.mu.RLock()
defer ctl.mu.RUnlock()
pxy, ok = ctl.proxies[name]
return
}
func (ctl *Control) addProxy(name string, pxy Proxy) {
ctl.mu.Lock()
defer ctl.mu.Unlock()
ctl.proxies[name] = pxy
}