msg: new message CloseProxy

This commit is contained in:
fatedier 2017-06-11 17:22:05 +08:00
parent 5b303f5148
commit fca7f42b37
3 changed files with 35 additions and 3 deletions

View File

@ -358,6 +358,9 @@ func (ctl *Control) manager() {
pxy := NewProxy(ctl, cfg) pxy := NewProxy(ctl, cfg)
if err := pxy.Run(); err != nil { if err := pxy.Run(); err != nil {
ctl.Warn("[%s] proxy start running error: %v", m.ProxyName, err) ctl.Warn("[%s] proxy start running error: %v", m.ProxyName, err)
ctl.sendCh <- &msg.CloseProxy{
ProxyName: m.ProxyName,
}
continue continue
} }
ctl.proxies[m.ProxyName] = pxy ctl.proxies[m.ProxyName] = pxy

View File

@ -24,6 +24,7 @@ const (
TypeLoginResp = '1' TypeLoginResp = '1'
TypeNewProxy = 'p' TypeNewProxy = 'p'
TypeNewProxyResp = '2' TypeNewProxyResp = '2'
TypeCloseProxy = 'c'
TypeNewWorkConn = 'w' TypeNewWorkConn = 'w'
TypeReqWorkConn = 'r' TypeReqWorkConn = 'r'
TypeStartWorkConn = 's' TypeStartWorkConn = 's'
@ -45,6 +46,7 @@ func init() {
TypeMap[TypeLoginResp] = reflect.TypeOf(LoginResp{}) TypeMap[TypeLoginResp] = reflect.TypeOf(LoginResp{})
TypeMap[TypeNewProxy] = reflect.TypeOf(NewProxy{}) TypeMap[TypeNewProxy] = reflect.TypeOf(NewProxy{})
TypeMap[TypeNewProxyResp] = reflect.TypeOf(NewProxyResp{}) TypeMap[TypeNewProxyResp] = reflect.TypeOf(NewProxyResp{})
TypeMap[TypeCloseProxy] = reflect.TypeOf(CloseProxy{})
TypeMap[TypeNewWorkConn] = reflect.TypeOf(NewWorkConn{}) TypeMap[TypeNewWorkConn] = reflect.TypeOf(NewWorkConn{})
TypeMap[TypeReqWorkConn] = reflect.TypeOf(ReqWorkConn{}) TypeMap[TypeReqWorkConn] = reflect.TypeOf(ReqWorkConn{})
TypeMap[TypeStartWorkConn] = reflect.TypeOf(StartWorkConn{}) TypeMap[TypeStartWorkConn] = reflect.TypeOf(StartWorkConn{})
@ -105,6 +107,10 @@ type NewProxyResp struct {
Error string `json:"error"` Error string `json:"error"`
} }
type CloseProxy struct {
ProxyName string `json:"proxy_name"`
}
type NewWorkConn struct { type NewWorkConn struct {
RunId string `json:"run_id"` RunId string `json:"run_id"`
} }

View File

@ -50,7 +50,7 @@ type Control struct {
workConnCh chan net.Conn workConnCh chan net.Conn
// proxies in one client // proxies in one client
proxies []Proxy proxies map[string]Proxy
// pool count // pool count
poolCount int poolCount int
@ -82,7 +82,7 @@ func NewControl(svr *Service, ctlConn net.Conn, loginMsg *msg.Login) *Control {
sendCh: make(chan msg.Message, 10), sendCh: make(chan msg.Message, 10),
readCh: make(chan msg.Message, 10), readCh: make(chan msg.Message, 10),
workConnCh: make(chan net.Conn, loginMsg.PoolCount+10), workConnCh: make(chan net.Conn, loginMsg.PoolCount+10),
proxies: make([]Proxy, 0), proxies: make(map[string]Proxy),
poolCount: loginMsg.PoolCount, poolCount: loginMsg.PoolCount,
lastPing: time.Now(), lastPing: time.Now(),
runId: loginMsg.RunId, runId: loginMsg.RunId,
@ -265,6 +265,8 @@ func (ctl *Control) stoper() {
workConn.Close() workConn.Close()
} }
ctl.mu.Lock()
defer ctl.mu.Unlock()
for _, pxy := range ctl.proxies { for _, pxy := range ctl.proxies {
pxy.Close() pxy.Close()
ctl.svr.DelProxy(pxy.GetName()) ctl.svr.DelProxy(pxy.GetName())
@ -317,6 +319,9 @@ func (ctl *Control) manager() {
StatsNewProxy(m.ProxyName, m.ProxyType) StatsNewProxy(m.ProxyName, m.ProxyType)
} }
ctl.sendCh <- resp ctl.sendCh <- resp
case *msg.CloseProxy:
ctl.CloseProxy(m)
ctl.conn.Info("close proxy [%s] success", m.ProxyName)
case *msg.Ping: case *msg.Ping:
ctl.lastPing = time.Now() ctl.lastPing = time.Now()
ctl.conn.Debug("receive heartbeat") ctl.conn.Debug("receive heartbeat")
@ -355,6 +360,24 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (err error) {
if err != nil { if err != nil {
return err return err
} }
ctl.proxies = append(ctl.proxies, pxy)
ctl.mu.Lock()
ctl.proxies[pxy.GetName()] = pxy
ctl.mu.Unlock()
return nil return nil
} }
func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.mu.Lock()
defer ctl.mu.Unlock()
pxy, ok := ctl.proxies[closeMsg.ProxyName]
if !ok {
return
}
pxy.Close()
ctl.svr.DelProxy(pxy.GetName())
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
return
}