close all proxies if protocol = kcp

This commit is contained in:
fatedier 2017-06-27 01:59:30 +08:00
parent b811a620c3
commit aede4e54f8
4 changed files with 57 additions and 3 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/crypto" "github.com/fatedier/frp/utils/crypto"
"github.com/fatedier/frp/utils/errors"
"github.com/fatedier/frp/utils/log" "github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/util"
@ -69,8 +70,8 @@ type Control struct {
// run id got from server // run id got from server
runId string runId string
// connection or other error happens , control will try to reconnect to server // if we call close() in control, do not reconnect to server
closed int32 exit bool
// goroutines can block by reading from this channel, it will be closed only in reader() when control connection is closed // goroutines can block by reading from this channel, it will be closed only in reader() when control connection is closed
closedCh chan int closedCh chan int
@ -181,7 +182,10 @@ func (ctl *Control) NewWorkConn() {
workConn.AddLogPrefix(startMsg.ProxyName) workConn.AddLogPrefix(startMsg.ProxyName)
// dispatch this work connection to related proxy // dispatch this work connection to related proxy
if pxy, ok := ctl.proxies[startMsg.ProxyName]; ok { ctl.mu.RLock()
pxy, ok := ctl.proxies[startMsg.ProxyName]
ctl.mu.RUnlock()
if ok {
workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
go pxy.InWorkConn(workConn) go pxy.InWorkConn(workConn)
} else { } else {
@ -189,6 +193,20 @@ func (ctl *Control) NewWorkConn() {
} }
} }
func (ctl *Control) Close() error {
ctl.mu.Lock()
ctl.exit = true
err := errors.PanicToError(func() {
for name, _ := range ctl.proxies {
ctl.sendCh <- &msg.CloseProxy{
ProxyName: name,
}
}
})
ctl.mu.Unlock()
return err
}
func (ctl *Control) init() { func (ctl *Control) init() {
ctl.sendCh = make(chan msg.Message, 10) ctl.sendCh = make(chan msg.Message, 10)
ctl.readCh = make(chan msg.Message, 10) ctl.readCh = make(chan msg.Message, 10)
@ -377,7 +395,10 @@ func (ctl *Control) manager() {
ctl.Warn("[%s] no proxy conf found", m.ProxyName) ctl.Warn("[%s] no proxy conf found", m.ProxyName)
continue continue
} }
ctl.mu.RLock()
oldPxy, ok := ctl.proxies[m.ProxyName] oldPxy, ok := ctl.proxies[m.ProxyName]
ctl.mu.RUnlock()
if ok { if ok {
oldPxy.Close() oldPxy.Close()
} }
@ -389,7 +410,9 @@ func (ctl *Control) manager() {
} }
continue continue
} }
ctl.mu.Lock()
ctl.proxies[m.ProxyName] = pxy ctl.proxies[m.ProxyName] = pxy
ctl.mu.Unlock()
ctl.Info("[%s] start proxy success", m.ProxyName) ctl.Info("[%s] start proxy success", m.ProxyName)
case *msg.Pong: case *msg.Pong:
ctl.lastPong = time.Now() ctl.lastPong = time.Now()
@ -429,6 +452,14 @@ func (ctl *Control) controler() {
for _, pxy := range ctl.proxies { for _, pxy := range ctl.proxies {
pxy.Close() pxy.Close()
} }
// if ctl.exit is true, just exit
ctl.mu.RLock()
exit := ctl.exit
ctl.mu.RUnlock()
if exit {
return
}
time.Sleep(time.Second) time.Sleep(time.Second)
// loop util reconnect to server success // loop util reconnect to server success

View File

@ -41,3 +41,7 @@ func (svr *Service) Run() error {
<-svr.closedCh <-svr.closedCh
return nil return nil
} }
func (svr *Service) Close() error {
return svr.ctl.Close()
}

View File

@ -17,8 +17,11 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"os/signal"
"strconv" "strconv"
"strings" "strings"
"syscall"
"time"
docopt "github.com/docopt/docopt-go" docopt "github.com/docopt/docopt-go"
ini "github.com/vaughan0/go-ini" ini "github.com/vaughan0/go-ini"
@ -116,9 +119,24 @@ func main() {
config.ClientCommonCfg.LogLevel, config.ClientCommonCfg.LogMaxDays) config.ClientCommonCfg.LogLevel, config.ClientCommonCfg.LogMaxDays)
svr := client.NewService(pxyCfgs, vistorCfgs) svr := client.NewService(pxyCfgs, vistorCfgs)
// Capture the exit signal if we use kcp.
if config.ClientCommonCfg.Protocol == "kcp" {
go HandleSignal(svr)
}
err = svr.Run() err = svr.Run()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
} }
} }
func HandleSignal(svr *client.Service) {
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
svr.Close()
time.Sleep(250 * time.Millisecond)
os.Exit(0)
}

View File

@ -378,6 +378,7 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
pxy.Close() pxy.Close()
ctl.svr.DelProxy(pxy.GetName()) ctl.svr.DelProxy(pxy.GetName())
delete(ctl.proxies, closeMsg.ProxyName)
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
return return
} }