all: improvement in utils/conn

This commit is contained in:
fatedier 2016-02-25 10:28:34 +08:00
parent 26479cf92a
commit 42e062b3b5
6 changed files with 59 additions and 33 deletions

View File

@ -14,8 +14,6 @@ import (
"github.com/fatedier/frp/utils/log" "github.com/fatedier/frp/utils/log"
) )
var isHeartBeatContinue bool = true
func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
defer wait.Done() defer wait.Done()
@ -30,9 +28,10 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
// ignore response content now // ignore response content now
_, err := c.ReadLine() _, err := c.ReadLine()
if err == io.EOF { if err == io.EOF {
isHeartBeatContinue = false
log.Debug("ProxyName [%s], server close this control conn", cli.Name) log.Debug("ProxyName [%s], server close this control conn", cli.Name)
var sleepTime time.Duration = 1 var sleepTime time.Duration = 1
// loop until connect to server
for { for {
log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort) log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort)
tmpConn, err := loginToServer(cli) tmpConn, err := loginToServer(cli)
@ -114,5 +113,5 @@ func startHeartBeat(c *conn.Conn) {
break break
} }
} }
log.Info("heartbeat exit") log.Debug("heartbeat exit")
} }

View File

@ -15,7 +15,10 @@ import (
func ProcessControlConn(l *conn.Listener) { func ProcessControlConn(l *conn.Listener) {
for { for {
c := l.GetConn() c, err := l.GetConn()
if err != nil {
return
}
log.Debug("Get one new conn, %v", c.GetRemoteAddr()) log.Debug("Get one new conn, %v", c.GetRemoteAddr())
go controlWorker(c) go controlWorker(c)
} }
@ -47,7 +50,6 @@ func controlWorker(c *conn.Conn) {
} }
if needRes { if needRes {
// control conn
defer c.Close() defer c.Close()
buf, _ := json.Marshal(clientCtlRes) buf, _ := json.Marshal(clientCtlRes)
@ -62,7 +64,7 @@ func controlWorker(c *conn.Conn) {
return return
} }
// others is from server to client // other messages is from server to client
s, ok := server.ProxyServers[clientCtlReq.ProxyName] s, ok := server.ProxyServers[clientCtlReq.ProxyName]
if !ok { if !ok {
log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName) log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
@ -138,7 +140,7 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne
return return
} }
s.CliConnChan <- c s.GetNewCliConn(c)
} else { } else {
info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type) info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type)
log.Warn(info) log.Warn(info)
@ -153,8 +155,8 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
isContinueRead := true isContinueRead := true
f := func() { f := func() {
isContinueRead = false isContinueRead = false
c.Close()
s.Close() s.Close()
log.Error("ProxyName [%s], client heartbeat timeout", s.Name)
} }
timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f) timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f)
defer timer.Stop() defer timer.Stop()
@ -164,13 +166,17 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
log.Warn("ProxyName [%s], client is dead!", s.Name) log.Warn("ProxyName [%s], client is dead!", s.Name)
c.Close()
s.Close() s.Close()
break break
} else if c.IsClosed() {
log.Warn("ProxyName [%s], client connection is closed", s.Name)
break
} }
log.Error("ProxyName [%s], read error: %v", s.Name, err) log.Error("ProxyName [%s], read error: %v", s.Name, err)
continue continue
} }
log.Debug("ProxyName [%s], get heartbeat", s.Name)
timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
} }

View File

@ -1,7 +1,7 @@
# common是必须的section # common是必须的section
[common] [common]
server_addr = 127.0.0.1 server_addr = 127.0.0.1
bind_port = 7000 server_port = 7000
log_file = ./frpc.log log_file = ./frpc.log
# debug, info, warn, error # debug, info, warn, error
log_level = debug log_level = debug

View File

@ -15,6 +15,7 @@ var (
LogLevel string = "warn" LogLevel string = "warn"
LogWay string = "file" LogWay string = "file"
HeartBeatTimeout int64 = 30 HeartBeatTimeout int64 = 30
UserConnTimeout int64 = 10
) )
var ProxyServers map[string]*ProxyServer = make(map[string]*ProxyServer) var ProxyServers map[string]*ProxyServer = make(map[string]*ProxyServer)

View File

@ -3,6 +3,7 @@ package server
import ( import (
"container/list" "container/list"
"sync" "sync"
"time"
"github.com/fatedier/frp/models/consts" "github.com/fatedier/frp/models/consts"
"github.com/fatedier/frp/utils/conn" "github.com/fatedier/frp/utils/conn"
@ -15,17 +16,17 @@ type ProxyServer struct {
BindAddr string BindAddr string
ListenPort int64 ListenPort int64
Status int64 Status int64
CliConnChan chan *conn.Conn // get client conns from control goroutine
listener *conn.Listener // accept new connection from remote users listener *conn.Listener // accept new connection from remote users
ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
cliConnChan chan *conn.Conn // get client conns from control goroutine
userConnList *list.List // store user conns userConnList *list.List // store user conns
mutex sync.Mutex mutex sync.Mutex
} }
func (p *ProxyServer) Init() { func (p *ProxyServer) Init() {
p.Status = consts.Idle p.Status = consts.Idle
p.CliConnChan = make(chan *conn.Conn) p.cliConnChan = make(chan *conn.Conn)
p.ctlMsgChan = make(chan int64) p.ctlMsgChan = make(chan int64)
p.userConnList = list.New() p.userConnList = list.New()
} }
@ -48,13 +49,13 @@ func (p *ProxyServer) Start() (err error) {
p.Status = consts.Working p.Status = consts.Working
// start a goroutine for listener // start a goroutine for listener to accept user connection
go func() { go func() {
for { for {
// block // block
// if listener is closed, get nil // if listener is closed, err returned
c := p.listener.GetConn() c, err := p.listener.GetConn()
if c == nil { if err != nil {
log.Info("ProxyName [%s], listener is closed", p.Name) log.Info("ProxyName [%s], listener is closed", p.Name)
return return
} }
@ -73,13 +74,28 @@ func (p *ProxyServer) Start() (err error) {
// put msg to control conn // put msg to control conn
p.ctlMsgChan <- 1 p.ctlMsgChan <- 1
// set timeout
time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() {
p.Lock()
defer p.Unlock()
element := p.userConnList.Front()
if element == nil {
return
}
userConn := element.Value.(*conn.Conn)
if userConn == c {
log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr())
}
})
} }
}() }()
// start another goroutine for join two conns from client and user // start another goroutine for join two conns from client and user
go func() { go func() {
for { for {
cliConn, ok := <-p.CliConnChan cliConn, ok := <-p.cliConnChan
if !ok { if !ok {
return return
} }
@ -114,7 +130,7 @@ func (p *ProxyServer) Close() {
p.Status = consts.Idle p.Status = consts.Idle
p.listener.Close() p.listener.Close()
close(p.ctlMsgChan) close(p.ctlMsgChan)
close(p.CliConnChan) close(p.cliConnChan)
p.userConnList = list.New() p.userConnList = list.New()
p.Unlock() p.Unlock()
} }
@ -128,3 +144,7 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) {
} }
return return
} }
func (p *ProxyServer) GetNewCliConn(c *conn.Conn) {
p.cliConnChan <- c
}

View File

@ -52,15 +52,15 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) {
return l, err return l, err
} }
// wait util get one new connection or close // wait util get one new connection or listener is closed
// if listener is closed, return nil // if listener is closed, err returned
func (l *Listener) GetConn() (conn *Conn) { func (l *Listener) GetConn() (conn *Conn, err error) {
var ok bool var ok bool
conn, ok = <-l.conns conn, ok = <-l.conns
if !ok { if !ok {
return nil return conn, fmt.Errorf("channel close")
} }
return conn return conn, nil
} }
func (l *Listener) Close() { func (l *Listener) Close() {
@ -116,7 +116,7 @@ func (c *Conn) Write(content string) (err error) {
} }
func (c *Conn) Close() { func (c *Conn) Close() {
if c.TcpConn != nil { if c.TcpConn != nil && c.closeFlag == false {
c.closeFlag = true c.closeFlag = true
c.TcpConn.Close() c.TcpConn.Close()
} }