From 69b09eb8a2052c6a29a6b1c628fe64055c707b45 Mon Sep 17 00:00:00 2001 From: fatedier Date: Mon, 15 May 2017 00:08:21 +0800 Subject: [PATCH] udp: add heartbeat in udp work connection --- Makefile.cross-compiles | 3 ++ client/proxy.go | 51 ++++++++++++++++++++++++---------- conf/frpc.ini | 6 ++-- conf/frps.ini | 4 +-- models/config/client_common.go | 4 +-- models/config/server_common.go | 2 +- models/proto/udp/udp.go | 2 +- server/proxy.go | 38 ++++++++++++++++--------- 8 files changed, 74 insertions(+), 36 deletions(-) diff --git a/Makefile.cross-compiles b/Makefile.cross-compiles index b749fbb..10f700a 100644 --- a/Makefile.cross-compiles +++ b/Makefile.cross-compiles @@ -27,3 +27,6 @@ app: env CGO_ENABLED=0 GOOS=linux GOARCH=mips go build -o -ldflags "$(LDFLAGS)" ./frps_linux_mips ./cmd/frps env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle go build -o -ldflags "$(LDFLAGS)" ./frpc_linux_mipsle ./cmd/frpc env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle go build -o -ldflags "$(LDFLAGS)" ./frps_linux_mipsle ./cmd/frps + +temp: + env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./frps_linux_amd64 ./cmd/frps diff --git a/client/proxy.go b/client/proxy.go index 754f070..416f85e 100644 --- a/client/proxy.go +++ b/client/proxy.go @@ -19,6 +19,7 @@ import ( "io" "net" "sync" + "time" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" @@ -141,8 +142,10 @@ type UdpProxy struct { localAddr *net.UDPAddr readCh chan *msg.UdpPacket - sendCh chan *msg.UdpPacket - workConn frpNet.Conn + + // include msg.UdpPacket and msg.Ping + sendCh chan msg.Message + workConn frpNet.Conn } func (pxy *UdpProxy) Run() (err error) { @@ -172,18 +175,18 @@ func (pxy *UdpProxy) Close() { } func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { - pxy.Info("incoming a new work connection for udp proxy") + pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String()) // close resources releated with old workConn pxy.Close() pxy.mu.Lock() pxy.workConn = conn - pxy.readCh = make(chan *msg.UdpPacket, 64) - pxy.sendCh = make(chan *msg.UdpPacket, 64) + pxy.readCh = make(chan *msg.UdpPacket, 1024) + pxy.sendCh = make(chan msg.Message, 1024) pxy.closed = false pxy.mu.Unlock() - workConnReaderFn := func(conn net.Conn) { + workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) { for { var udpMsg msg.UdpPacket if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { @@ -192,26 +195,46 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { } if errRet := errors.PanicToError(func() { pxy.Trace("get udp package from workConn: %s", udpMsg.Content) - pxy.readCh <- &udpMsg + readCh <- &udpMsg }); errRet != nil { pxy.Info("reader goroutine for udp work connection closed: %v", errRet) return } } } - workConnSenderFn := func(conn net.Conn) { + workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) { + defer func() { + pxy.Info("writer goroutine for udp work connection closed") + }() var errRet error - for udpMsg := range pxy.sendCh { - pxy.Trace("send udp package to workConn: %s", udpMsg.Content) - if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { - pxy.Info("sender goroutine for udp work connection closed") + for rawMsg := range sendCh { + switch m := rawMsg.(type) { + case *msg.UdpPacket: + pxy.Trace("send udp package to workConn: %s", m.Content) + case *msg.Ping: + pxy.Trace("send ping message to udp workConn") + } + if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil { + pxy.Error("udp work write error: %v", errRet) return } } } + heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) { + var errRet error + for { + time.Sleep(time.Duration(30) * time.Second) + if errRet = errors.PanicToError(func() { + sendCh <- &msg.Ping{} + }); errRet != nil { + pxy.Trace("heartbeat goroutine for udp work connection closed") + } + } + } - go workConnSenderFn(pxy.workConn) - go workConnReaderFn(pxy.workConn) + go workConnSenderFn(pxy.workConn, pxy.sendCh) + go workConnReaderFn(pxy.workConn, pxy.readCh) + go heartbeatFn(pxy.workConn, pxy.sendCh) udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh) } diff --git a/conf/frpc.ini b/conf/frpc.ini index f32d35f..16bb0ad 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -26,9 +26,9 @@ pool_count = 5 user = your_name # heartbeat configure, it's not recommended to modify the default value -# the default value of heartbeat_interval is 10 and heartbeat_timeout is 30 -# heartbeat_interval = 10 -# heartbeat_timeout = 30 +# the default value of heartbeat_interval is 10 and heartbeat_timeout is 90 +# heartbeat_interval = 30 +# heartbeat_timeout = 90 # ssh is the proxy name same as server's configuration # if user in [common] section is not empty, it will be changed to {user}.{proxy} such as your_name.ssh diff --git a/conf/frps.ini b/conf/frps.ini index cd8ca01..6b50116 100644 --- a/conf/frps.ini +++ b/conf/frps.ini @@ -30,8 +30,8 @@ log_max_days = 3 privilege_token = 12345678 # heartbeat configure, it's not recommended to modify the default value -# the default value of heartbeat_timeout is 30 -# heartbeat_timeout = 30 +# the default value of heartbeat_timeout is 90 +# heartbeat_timeout = 90 # only allow frpc to bind ports you list, if you set nothing, there won't be any limit privilege_allow_ports = 2000-3000,3001,3003,4000-50000 diff --git a/models/config/client_common.go b/models/config/client_common.go index 2c22312..6bd8f35 100644 --- a/models/config/client_common.go +++ b/models/config/client_common.go @@ -54,8 +54,8 @@ func GetDeaultClientCommonConf() *ClientCommonConf { PrivilegeToken: "", PoolCount: 1, User: "", - HeartBeatInterval: 10, - HeartBeatTimeout: 30, + HeartBeatInterval: 30, + HeartBeatTimeout: 90, } } diff --git a/models/config/server_common.go b/models/config/server_common.go index 9a07795..eff4d6d 100644 --- a/models/config/server_common.go +++ b/models/config/server_common.go @@ -77,7 +77,7 @@ func GetDefaultServerCommonConf() *ServerCommonConf { AuthTimeout: 900, SubDomainHost: "", MaxPoolCount: 10, - HeartBeatTimeout: 30, + HeartBeatTimeout: 90, UserConnTimeout: 10, } } diff --git a/models/proto/udp/udp.go b/models/proto/udp/udp.go index a7d42c1..fe548f1 100644 --- a/models/proto/udp/udp.go +++ b/models/proto/udp/udp.go @@ -69,7 +69,7 @@ func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UdpPacket, sendCh return } -func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UdpPacket, sendCh chan<- *msg.UdpPacket) { +func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UdpPacket, sendCh chan<- msg.Message) { var ( mu sync.RWMutex ) diff --git a/server/proxy.go b/server/proxy.go index 62cd37c..5418353 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -310,16 +310,21 @@ func (pxy *UdpProxy) Run() (err error) { pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort) pxy.udpConn = udpConn - pxy.sendCh = make(chan *msg.UdpPacket, 64) - pxy.readCh = make(chan *msg.UdpPacket, 64) + pxy.sendCh = make(chan *msg.UdpPacket, 1024) + pxy.readCh = make(chan *msg.UdpPacket, 1024) pxy.checkCloseCh = make(chan int) // read message from workConn, if it returns any error, notify proxy to start a new workConn workConnReaderFn := func(conn net.Conn) { for { - var udpMsg msg.UdpPacket + var ( + rawMsg msg.Message + errRet error + ) pxy.Trace("loop waiting message from udp workConn") - if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { + // client will send heartbeat in workConn for keeping alive + conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second)) + if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil { pxy.Warn("read from workConn for udp error: %v", errRet) conn.Close() // notify proxy to start a new work connection @@ -329,14 +334,21 @@ func (pxy *UdpProxy) Run() (err error) { }) return } - if errRet := errors.PanicToError(func() { - pxy.Trace("get udp message from workConn: %s", udpMsg.Content) - pxy.readCh <- &udpMsg - StatsAddTrafficOut(pxy.GetName(), int64(len(udpMsg.Content))) - }); errRet != nil { - conn.Close() - pxy.Info("reader goroutine for udp work connection closed") - return + conn.SetReadDeadline(time.Time{}) + switch m := rawMsg.(type) { + case *msg.Ping: + pxy.Trace("udp work conn get ping message") + continue + case *msg.UdpPacket: + if errRet := errors.PanicToError(func() { + pxy.Trace("get udp message from workConn: %s", m.Content) + pxy.readCh <- m + StatsAddTrafficOut(pxy.GetName(), int64(len(m.Content))) + }); errRet != nil { + conn.Close() + pxy.Info("reader goroutine for udp work connection closed") + return + } } } } @@ -348,7 +360,7 @@ func (pxy *UdpProxy) Run() (err error) { select { case udpMsg, ok := <-pxy.sendCh: if !ok { - pxy.Info("sender goroutine for udp work condition closed") + pxy.Info("sender goroutine for udp work connection closed") return } if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {