udp proxy: fix reconnect error, fix #209

This commit is contained in:
fatedier 2017-01-05 23:07:58 +08:00
parent 1868b3bafb
commit 1f49510e3e
3 changed files with 45 additions and 5 deletions

View File

@ -55,15 +55,24 @@ func msgReader(cli *client.ProxyClient, c *conn.Conn, msgSendChan chan interface
var heartbeatTimeout bool = false var heartbeatTimeout bool = false
timer := time.AfterFunc(time.Duration(client.HeartBeatTimeout)*time.Second, func() { timer := time.AfterFunc(time.Duration(client.HeartBeatTimeout)*time.Second, func() {
heartbeatTimeout = true heartbeatTimeout = true
c.Close() if c != nil {
c.Close()
}
if cli != nil {
// if it's not udp type, nothing will happen
cli.CloseUdpTunnel()
cli.SetCloseFlag(true)
}
log.Error("ProxyName [%s], heartbeatRes from frps timeout", cli.Name) log.Error("ProxyName [%s], heartbeatRes from frps timeout", cli.Name)
}) })
defer timer.Stop() defer timer.Stop()
for { for {
buf, err := c.ReadLine() buf, err := c.ReadLine()
if err == io.EOF || c == nil || c.IsClosed() { if err == io.EOF || c.IsClosed() {
timer.Stop()
c.Close() c.Close()
cli.SetCloseFlag(true)
log.Warn("ProxyName [%s], frps close this control conn!", cli.Name) log.Warn("ProxyName [%s], frps close this control conn!", cli.Name)
var delayTime time.Duration = 1 var delayTime time.Duration = 1
@ -76,11 +85,14 @@ func msgReader(cli *client.ProxyClient, c *conn.Conn, msgSendChan chan interface
msgSendChan = make(chan interface{}, 1024) msgSendChan = make(chan interface{}, 1024)
go heartbeatSender(c, msgSendChan) go heartbeatSender(c, msgSendChan)
go msgSender(cli, c, msgSendChan) go msgSender(cli, c, msgSendChan)
cli.SetCloseFlag(false)
break break
} }
if delayTime < 60 { if delayTime < 30 {
delayTime = delayTime * 2 delayTime = delayTime * 2
} else {
delayTime = 30
} }
time.Sleep(delayTime * time.Second) time.Sleep(delayTime * time.Second)
} }

View File

@ -85,7 +85,9 @@ func controlWorker(c *conn.Conn) {
return return
} }
} else { } else {
closeFlag = false if ret == 0 {
closeFlag = false
}
return return
} }

View File

@ -39,6 +39,9 @@ type ProxyClient struct {
udpTunnel *conn.Conn udpTunnel *conn.Conn
once sync.Once once sync.Once
closeFlag bool
mutex sync.RWMutex
} }
// if proxy type is udp, keep a tcp connection for transferring udp packages // if proxy type is udp, keep a tcp connection for transferring udp packages
@ -48,7 +51,7 @@ func (pc *ProxyClient) StartUdpTunnelOnce(addr string, port int64) {
var c *conn.Conn var c *conn.Conn
udpProcessor := NewUdpProcesser(nil, pc.LocalIp, pc.LocalPort) udpProcessor := NewUdpProcesser(nil, pc.LocalIp, pc.LocalPort)
for { for {
if pc.udpTunnel == nil || pc.udpTunnel.IsClosed() { if !pc.IsClosed() && (pc.udpTunnel == nil || pc.udpTunnel.IsClosed()) {
if HttpProxy == "" { if HttpProxy == "" {
c, err = conn.ConnectServer(fmt.Sprintf("%s:%d", addr, port)) c, err = conn.ConnectServer(fmt.Sprintf("%s:%d", addr, port))
} else { } else {
@ -82,8 +85,11 @@ func (pc *ProxyClient) StartUdpTunnelOnce(addr string, port int64) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
continue continue
} }
pc.mutex.Lock()
pc.udpTunnel = c pc.udpTunnel = c
udpProcessor.UpdateTcpConn(pc.udpTunnel) udpProcessor.UpdateTcpConn(pc.udpTunnel)
pc.mutex.Unlock()
udpProcessor.Run() udpProcessor.Run()
} }
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -91,6 +97,14 @@ func (pc *ProxyClient) StartUdpTunnelOnce(addr string, port int64) {
}) })
} }
func (pc *ProxyClient) CloseUdpTunnel() {
pc.mutex.RLock()
defer pc.mutex.RUnlock()
if pc.udpTunnel != nil {
pc.udpTunnel.Close()
}
}
func (pc *ProxyClient) GetLocalConn() (c *conn.Conn, err error) { func (pc *ProxyClient) GetLocalConn() (c *conn.Conn, err error) {
c, err = conn.ConnectServer(fmt.Sprintf("%s:%d", pc.LocalIp, pc.LocalPort)) c, err = conn.ConnectServer(fmt.Sprintf("%s:%d", pc.LocalIp, pc.LocalPort))
if err != nil { if err != nil {
@ -158,3 +172,15 @@ func (pc *ProxyClient) StartTunnel(serverAddr string, serverPort int64) (err err
return nil return nil
} }
func (pc *ProxyClient) SetCloseFlag(closeFlag bool) {
pc.mutex.Lock()
defer pc.mutex.Unlock()
pc.closeFlag = closeFlag
}
func (pc *ProxyClient) IsClosed() bool {
pc.mutex.RLock()
defer pc.mutex.RUnlock()
return pc.closeFlag
}