diff --git a/Makefile b/Makefile index f737d3f..cc6ee6f 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ file: fmt: go fmt ./... - + frps: go build -o bin/frps ./cmd/frps diff --git a/client/control.go b/client/control.go index 30992f9..5589817 100644 --- a/client/control.go +++ b/client/control.go @@ -15,9 +15,11 @@ package client import ( + "context" "crypto/tls" "fmt" "io" + "net" "runtime/debug" "sync" "time" @@ -25,8 +27,8 @@ import ( "github.com/fatedier/frp/client/proxy" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/xlog" "github.com/fatedier/golib/control/shutdown" "github.com/fatedier/golib/crypto" @@ -45,7 +47,7 @@ type Control struct { vm *VisitorManager // control connection - conn frpNet.Conn + conn net.Conn // tcp stream multiplexing, if enabled session *fmux.Session @@ -76,12 +78,19 @@ type Control struct { mu sync.RWMutex - log.Logger + xl *xlog.Logger + + // service context + ctx context.Context } -func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, clientCfg config.ClientCommonConf, - pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf, serverUDPPort int) *Control { +func NewControl(ctx context.Context, runId string, conn net.Conn, session *fmux.Session, + clientCfg config.ClientCommonConf, + pxyCfgs map[string]config.ProxyConf, + visitorCfgs map[string]config.VisitorConf, + serverUDPPort int) *Control { + // new xlog instance ctl := &Control{ runId: runId, conn: conn, @@ -96,11 +105,12 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, clientCfg writerShutdown: shutdown.New(), msgHandlerShutdown: shutdown.New(), serverUDPPort: serverUDPPort, - Logger: log.NewPrefixLogger(""), + xl: xlog.FromContextSafe(ctx), + ctx: ctx, } - ctl.pm = proxy.NewProxyManager(ctl.sendCh, runId, clientCfg, serverUDPPort) + ctl.pm = proxy.NewProxyManager(ctl.ctx, ctl.sendCh, clientCfg, serverUDPPort) - ctl.vm = NewVisitorManager(ctl) + ctl.vm = NewVisitorManager(ctl.ctx, ctl) ctl.vm.Reload(visitorCfgs) return ctl } @@ -117,6 +127,7 @@ func (ctl *Control) Run() { } func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) { + xl := ctl.xl workConn, err := ctl.connectServer() if err != nil { return @@ -126,31 +137,31 @@ func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) { RunId: ctl.runId, } if err = msg.WriteMsg(workConn, m); err != nil { - ctl.Warn("work connection write to server error: %v", err) + xl.Warn("work connection write to server error: %v", err) workConn.Close() return } var startMsg msg.StartWorkConn if err = msg.ReadMsgInto(workConn, &startMsg); err != nil { - ctl.Error("work connection closed, %v", err) + xl.Error("work connection closed before response StartWorkConn message: %v", err) workConn.Close() return } - workConn.AddLogPrefix(startMsg.ProxyName) // dispatch this work connection to related proxy ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg) } func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) { + xl := ctl.xl // Server will return NewProxyResp message to each NewProxy message. // Start a new proxy handler if no error got err := ctl.pm.StartProxy(inMsg.ProxyName, inMsg.RemoteAddr, inMsg.Error) if err != nil { - ctl.Warn("[%s] start error: %v", inMsg.ProxyName, err) + xl.Warn("[%s] start error: %v", inMsg.ProxyName, err) } else { - ctl.Info("[%s] start proxy success", inMsg.ProxyName) + xl.Info("[%s] start proxy success", inMsg.ProxyName) } } @@ -169,15 +180,16 @@ func (ctl *Control) ClosedDoneCh() <-chan struct{} { } // connectServer return a new connection to frps -func (ctl *Control) connectServer() (conn frpNet.Conn, err error) { +func (ctl *Control) connectServer() (conn net.Conn, err error) { + xl := ctl.xl if ctl.clientCfg.TcpMux { stream, errRet := ctl.session.OpenStream() if errRet != nil { err = errRet - ctl.Warn("start new connection to server error: %v", err) + xl.Warn("start new connection to server error: %v", err) return } - conn = frpNet.WrapConn(stream) + conn = stream } else { var tlsConfig *tls.Config if ctl.clientCfg.TLSEnable { @@ -188,7 +200,7 @@ func (ctl *Control) connectServer() (conn frpNet.Conn, err error) { conn, err = frpNet.ConnectServerByProxyWithTLS(ctl.clientCfg.HttpProxy, ctl.clientCfg.Protocol, fmt.Sprintf("%s:%d", ctl.clientCfg.ServerAddr, ctl.clientCfg.ServerPort), tlsConfig) if err != nil { - ctl.Warn("start new connection to server error: %v", err) + xl.Warn("start new connection to server error: %v", err) return } } @@ -197,10 +209,11 @@ func (ctl *Control) connectServer() (conn frpNet.Conn, err error) { // reader read all messages from frps and send to readCh func (ctl *Control) reader() { + xl := ctl.xl defer func() { if err := recover(); err != nil { - ctl.Error("panic error: %v", err) - ctl.Error(string(debug.Stack())) + xl.Error("panic error: %v", err) + xl.Error(string(debug.Stack())) } }() defer ctl.readerShutdown.Done() @@ -210,10 +223,10 @@ func (ctl *Control) reader() { for { if m, err := msg.ReadMsg(encReader); err != nil { if err == io.EOF { - ctl.Debug("read from control connection EOF") + xl.Debug("read from control connection EOF") return } else { - ctl.Warn("read error: %v", err) + xl.Warn("read error: %v", err) ctl.conn.Close() return } @@ -225,20 +238,21 @@ func (ctl *Control) reader() { // writer writes messages got from sendCh to frps func (ctl *Control) writer() { + xl := ctl.xl defer ctl.writerShutdown.Done() encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.clientCfg.Token)) if err != nil { - ctl.conn.Error("crypto new writer error: %v", err) + xl.Error("crypto new writer error: %v", err) ctl.conn.Close() return } for { if m, ok := <-ctl.sendCh; !ok { - ctl.Info("control writer is closing") + xl.Info("control writer is closing") return } else { if err := msg.WriteMsg(encWriter, m); err != nil { - ctl.Warn("write message to control connection error: %v", err) + xl.Warn("write message to control connection error: %v", err) return } } @@ -247,10 +261,11 @@ func (ctl *Control) writer() { // msgHandler handles all channel events and do corresponding operations. func (ctl *Control) msgHandler() { + xl := ctl.xl defer func() { if err := recover(); err != nil { - ctl.Error("panic error: %v", err) - ctl.Error(string(debug.Stack())) + xl.Error("panic error: %v", err) + xl.Error(string(debug.Stack())) } }() defer ctl.msgHandlerShutdown.Done() @@ -266,11 +281,11 @@ func (ctl *Control) msgHandler() { select { case <-hbSend.C: // send heartbeat to server - ctl.Debug("send heartbeat to server") + xl.Debug("send heartbeat to server") ctl.sendCh <- &msg.Ping{} case <-hbCheck.C: if time.Since(ctl.lastPong) > time.Duration(ctl.clientCfg.HeartBeatTimeout)*time.Second { - ctl.Warn("heartbeat timeout") + xl.Warn("heartbeat timeout") // let reader() stop ctl.conn.Close() return @@ -287,7 +302,7 @@ func (ctl *Control) msgHandler() { ctl.HandleNewProxyResp(m) case *msg.Pong: ctl.lastPong = time.Now() - ctl.Debug("receive heartbeat from server") + xl.Debug("receive heartbeat from server") } } } diff --git a/client/health/health.go b/client/health/health.go index 91ea707..1277141 100644 --- a/client/health/health.go +++ b/client/health/health.go @@ -24,7 +24,7 @@ import ( "net/http" "time" - "github.com/fatedier/frp/utils/log" + "github.com/fatedier/frp/utils/xlog" ) var ( @@ -50,11 +50,11 @@ type HealthCheckMonitor struct { ctx context.Context cancel context.CancelFunc - - l log.Logger } -func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFailedTimes int, addr string, url string, +func NewHealthCheckMonitor(ctx context.Context, checkType string, + intervalS int, timeoutS int, maxFailedTimes int, + addr string, url string, statusNormalFn func(), statusFailedFn func()) *HealthCheckMonitor { if intervalS <= 0 { @@ -66,7 +66,7 @@ func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFai if maxFailedTimes <= 0 { maxFailedTimes = 1 } - ctx, cancel := context.WithCancel(context.Background()) + newctx, cancel := context.WithCancel(ctx) return &HealthCheckMonitor{ checkType: checkType, interval: time.Duration(intervalS) * time.Second, @@ -77,15 +77,11 @@ func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFai statusOK: false, statusNormalFn: statusNormalFn, statusFailedFn: statusFailedFn, - ctx: ctx, + ctx: newctx, cancel: cancel, } } -func (monitor *HealthCheckMonitor) SetLogger(l log.Logger) { - monitor.l = l -} - func (monitor *HealthCheckMonitor) Start() { go monitor.checkWorker() } @@ -95,6 +91,7 @@ func (monitor *HealthCheckMonitor) Stop() { } func (monitor *HealthCheckMonitor) checkWorker() { + xl := xlog.FromContextSafe(monitor.ctx) for { doCtx, cancel := context.WithDeadline(monitor.ctx, time.Now().Add(monitor.timeout)) err := monitor.doCheck(doCtx) @@ -109,25 +106,17 @@ func (monitor *HealthCheckMonitor) checkWorker() { } if err == nil { - if monitor.l != nil { - monitor.l.Trace("do one health check success") - } + xl.Trace("do one health check success") if !monitor.statusOK && monitor.statusNormalFn != nil { - if monitor.l != nil { - monitor.l.Info("health check status change to success") - } + xl.Info("health check status change to success") monitor.statusOK = true monitor.statusNormalFn() } } else { - if monitor.l != nil { - monitor.l.Warn("do one health check failed: %v", err) - } + xl.Warn("do one health check failed: %v", err) monitor.failedTimes++ if monitor.statusOK && int(monitor.failedTimes) >= monitor.maxFailedTimes && monitor.statusFailedFn != nil { - if monitor.l != nil { - monitor.l.Warn("health check status change to failed") - } + xl.Warn("health check status change to failed") monitor.statusOK = false monitor.statusFailedFn() } diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index f5e1b60..2da68ce 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -16,6 +16,7 @@ package proxy import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -29,8 +30,8 @@ import ( "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/plugin" "github.com/fatedier/frp/models/proto/udp" - "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/xlog" "github.com/fatedier/golib/errors" frpIo "github.com/fatedier/golib/io" @@ -44,17 +45,17 @@ type Proxy interface { Run() error // InWorkConn accept work connections registered to server. - InWorkConn(frpNet.Conn, *msg.StartWorkConn) + InWorkConn(net.Conn, *msg.StartWorkConn) Close() - log.Logger } -func NewProxy(pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) { +func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) { baseProxy := BaseProxy{ - Logger: log.NewPrefixLogger(pxyConf.GetBaseInfo().ProxyName), clientCfg: clientCfg, serverUDPPort: serverUDPPort, + xl: xlog.FromContextSafe(ctx), + ctx: ctx, } switch cfg := pxyConf.(type) { case *config.TcpProxyConf: @@ -93,10 +94,12 @@ func NewProxy(pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serve type BaseProxy struct { closed bool - mu sync.RWMutex clientCfg config.ClientCommonConf serverUDPPort int - log.Logger + + mu sync.RWMutex + xl *xlog.Logger + ctx context.Context } // TCP @@ -123,8 +126,8 @@ func (pxy *TcpProxy) Close() { } } -func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, +func (pxy *TcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, []byte(pxy.clientCfg.Token), m) } @@ -152,8 +155,8 @@ func (pxy *HttpProxy) Close() { } } -func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, +func (pxy *HttpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, []byte(pxy.clientCfg.Token), m) } @@ -181,8 +184,8 @@ func (pxy *HttpsProxy) Close() { } } -func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, +func (pxy *HttpsProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, []byte(pxy.clientCfg.Token), m) } @@ -210,8 +213,8 @@ func (pxy *StcpProxy) Close() { } } -func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, +func (pxy *StcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, []byte(pxy.clientCfg.Token), m) } @@ -239,12 +242,13 @@ func (pxy *XtcpProxy) Close() { } } -func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { +func (pxy *XtcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { + xl := pxy.xl defer conn.Close() var natHoleSidMsg msg.NatHoleSid err := msg.ReadMsgInto(conn, &natHoleSidMsg) if err != nil { - pxy.Error("xtcp read from workConn error: %v", err) + xl.Error("xtcp read from workConn error: %v", err) return } @@ -259,7 +263,7 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { err = msg.WriteMsg(clientConn, natHoleClientMsg) if err != nil { - pxy.Error("send natHoleClientMsg to server error: %v", err) + xl.Error("send natHoleClientMsg to server error: %v", err) return } @@ -270,28 +274,28 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { buf := pool.GetBuf(1024) n, err := clientConn.Read(buf) if err != nil { - pxy.Error("get natHoleRespMsg error: %v", err) + xl.Error("get natHoleRespMsg error: %v", err) return } err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg) if err != nil { - pxy.Error("get natHoleRespMsg error: %v", err) + xl.Error("get natHoleRespMsg error: %v", err) return } clientConn.SetReadDeadline(time.Time{}) clientConn.Close() if natHoleRespMsg.Error != "" { - pxy.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error) + xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error) return } - pxy.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr) + xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr) // Send detect message array := strings.Split(natHoleRespMsg.VisitorAddr, ":") if len(array) <= 1 { - pxy.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr) + xl.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr) } laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String()) /* @@ -301,18 +305,18 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { */ port, err := strconv.ParseInt(array[1], 10, 64) if err != nil { - pxy.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr) + xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr) return } pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid)) - pxy.Trace("send all detect msg done") + xl.Trace("send all detect msg done") msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{}) // Listen for clientConn's address and wait for visitor connection lConn, err := net.ListenUDP("udp", laddr) if err != nil { - pxy.Error("listen on visitorConn's local adress error: %v", err) + xl.Error("listen on visitorConn's local adress error: %v", err) return } defer lConn.Close() @@ -322,22 +326,22 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { var uAddr *net.UDPAddr n, uAddr, err = lConn.ReadFromUDP(sidBuf) if err != nil { - pxy.Warn("get sid from visitor error: %v", err) + xl.Warn("get sid from visitor error: %v", err) return } lConn.SetReadDeadline(time.Time{}) if string(sidBuf[:n]) != natHoleRespMsg.Sid { - pxy.Warn("incorrect sid from visitor") + xl.Warn("incorrect sid from visitor") return } pool.PutBuf(sidBuf) - pxy.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid) + xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid) lConn.WriteToUDP(sidBuf[:n], uAddr) kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, false, natHoleRespMsg.VisitorAddr) if err != nil { - pxy.Error("create kcp connection from udp connection error: %v", err) + xl.Error("create kcp connection from udp connection error: %v", err) return } @@ -346,18 +350,18 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { fmuxCfg.LogOutput = ioutil.Discard sess, err := fmux.Server(kcpConn, fmuxCfg) if err != nil { - pxy.Error("create yamux server from kcp connection error: %v", err) + xl.Error("create yamux server from kcp connection error: %v", err) return } defer sess.Close() muxConn, err := sess.Accept() if err != nil { - pxy.Error("accept for yamux connection error: %v", err) + xl.Error("accept for yamux connection error: %v", err) return } - HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, - frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, + muxConn, []byte(pxy.cfg.Sk), m) } func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) { @@ -390,7 +394,7 @@ type UdpProxy struct { // include msg.UdpPacket and msg.Ping sendCh chan msg.Message - workConn frpNet.Conn + workConn net.Conn } func (pxy *UdpProxy) Run() (err error) { @@ -419,8 +423,9 @@ func (pxy *UdpProxy) Close() { } } -func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { - pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String()) +func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { + xl := pxy.xl + xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String()) // close resources releated with old workConn pxy.Close() @@ -435,32 +440,32 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { for { var udpMsg msg.UdpPacket if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { - pxy.Warn("read from workConn for udp error: %v", errRet) + xl.Warn("read from workConn for udp error: %v", errRet) return } if errRet := errors.PanicToError(func() { - pxy.Trace("get udp package from workConn: %s", udpMsg.Content) + xl.Trace("get udp package from workConn: %s", udpMsg.Content) readCh <- &udpMsg }); errRet != nil { - pxy.Info("reader goroutine for udp work connection closed: %v", errRet) + xl.Info("reader goroutine for udp work connection closed: %v", errRet) return } } } workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) { defer func() { - pxy.Info("writer goroutine for udp work connection closed") + xl.Info("writer goroutine for udp work connection closed") }() var errRet error for rawMsg := range sendCh { switch m := rawMsg.(type) { case *msg.UdpPacket: - pxy.Trace("send udp package to workConn: %s", m.Content) + xl.Trace("send udp package to workConn: %s", m.Content) case *msg.Ping: - pxy.Trace("send ping message to udp workConn") + xl.Trace("send ping message to udp workConn") } if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil { - pxy.Error("udp work write error: %v", errRet) + xl.Error("udp work write error: %v", errRet) return } } @@ -472,7 +477,7 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { if errRet = errors.PanicToError(func() { sendCh <- &msg.Ping{} }); errRet != nil { - pxy.Trace("heartbeat goroutine for udp work connection closed") + xl.Trace("heartbeat goroutine for udp work connection closed") break } } @@ -485,20 +490,22 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { } // Common handler for tcp work connections. -func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, - baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte, m *msg.StartWorkConn) { - +func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, + baseInfo *config.BaseProxyConf, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) { + xl := xlog.FromContextSafe(ctx) var ( remote io.ReadWriteCloser err error ) remote = workConn + xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t", + baseInfo.UseEncryption, baseInfo.UseCompression) if baseInfo.UseEncryption { remote, err = frpIo.WithEncryption(remote, encKey) if err != nil { workConn.Close() - workConn.Error("create encryption stream error: %v", err) + xl.Error("create encryption stream error: %v", err) return } } @@ -541,19 +548,19 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin. if proxyPlugin != nil { // if plugin is set, let plugin handle connections first - workConn.Debug("handle by plugin: %s", proxyPlugin.Name()) + xl.Debug("handle by plugin: %s", proxyPlugin.Name()) proxyPlugin.Handle(remote, workConn, extraInfo) - workConn.Debug("handle by plugin finished") + xl.Debug("handle by plugin finished") return } else { localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort)) if err != nil { workConn.Close() - workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err) + xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err) return } - workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), + xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String()) if len(extraInfo) > 0 { @@ -561,6 +568,6 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin. } frpIo.Join(localConn, remote) - workConn.Debug("join connections closed") + xl.Debug("join connections closed") } } diff --git a/client/proxy/proxy_manager.go b/client/proxy/proxy_manager.go index 1d0d878..a0afd70 100644 --- a/client/proxy/proxy_manager.go +++ b/client/proxy/proxy_manager.go @@ -1,14 +1,15 @@ package proxy import ( + "context" "fmt" + "net" "sync" "github.com/fatedier/frp/client/event" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/utils/log" - frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/xlog" "github.com/fatedier/golib/errors" ) @@ -25,19 +26,17 @@ type ProxyManager struct { // The UDP port that the server is listening on serverUDPPort int - logPrefix string - log.Logger + ctx context.Context } -func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string, clientCfg config.ClientCommonConf, serverUDPPort int) *ProxyManager { +func NewProxyManager(ctx context.Context, msgSendCh chan (msg.Message), clientCfg config.ClientCommonConf, serverUDPPort int) *ProxyManager { return &ProxyManager{ - proxies: make(map[string]*ProxyWrapper), sendCh: msgSendCh, + proxies: make(map[string]*ProxyWrapper), closed: false, clientCfg: clientCfg, serverUDPPort: serverUDPPort, - logPrefix: logPrefix, - Logger: log.NewPrefixLogger(logPrefix), + ctx: ctx, } } @@ -65,7 +64,7 @@ func (pm *ProxyManager) Close() { pm.proxies = make(map[string]*ProxyWrapper) } -func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn, m *msg.StartWorkConn) { +func (pm *ProxyManager) HandleWorkConn(name string, workConn net.Conn, m *msg.StartWorkConn) { pm.mu.RLock() pw, ok := pm.proxies[name] pm.mu.RUnlock() @@ -104,6 +103,7 @@ func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus { } func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) { + xl := xlog.FromContextSafe(pm.ctx) pm.mu.Lock() defer pm.mu.Unlock() @@ -127,13 +127,13 @@ func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) { } } if len(delPxyNames) > 0 { - pm.Info("proxy removed: %v", delPxyNames) + xl.Info("proxy removed: %v", delPxyNames) } addPxyNames := make([]string, 0) for name, cfg := range pxyCfgs { if _, ok := pm.proxies[name]; !ok { - pxy := NewProxyWrapper(cfg, pm.clientCfg, pm.HandleEvent, pm.logPrefix, pm.serverUDPPort) + pxy := NewProxyWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.serverUDPPort) pm.proxies[name] = pxy addPxyNames = append(addPxyNames, name) @@ -141,6 +141,6 @@ func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) { } } if len(addPxyNames) > 0 { - pm.Info("proxy added: %v", addPxyNames) + xl.Info("proxy added: %v", addPxyNames) } } diff --git a/client/proxy/proxy_wrapper.go b/client/proxy/proxy_wrapper.go index b02e529..458fa43 100644 --- a/client/proxy/proxy_wrapper.go +++ b/client/proxy/proxy_wrapper.go @@ -1,7 +1,9 @@ package proxy import ( + "context" "fmt" + "net" "sync" "sync/atomic" "time" @@ -10,8 +12,7 @@ import ( "github.com/fatedier/frp/client/health" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/utils/log" - frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/xlog" "github.com/fatedier/golib/errors" ) @@ -62,11 +63,13 @@ type ProxyWrapper struct { healthNotifyCh chan struct{} mu sync.RWMutex - log.Logger + xl *xlog.Logger + ctx context.Context } -func NewProxyWrapper(cfg config.ProxyConf, clientCfg config.ClientCommonConf, eventHandler event.EventHandler, logPrefix string, serverUDPPort int) *ProxyWrapper { +func NewProxyWrapper(ctx context.Context, cfg config.ProxyConf, clientCfg config.ClientCommonConf, eventHandler event.EventHandler, serverUDPPort int) *ProxyWrapper { baseInfo := cfg.GetBaseInfo() + xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.ProxyName) pw := &ProxyWrapper{ ProxyStatus: ProxyStatus{ Name: baseInfo.ProxyName, @@ -77,20 +80,19 @@ func NewProxyWrapper(cfg config.ProxyConf, clientCfg config.ClientCommonConf, ev closeCh: make(chan struct{}), healthNotifyCh: make(chan struct{}), handler: eventHandler, - Logger: log.NewPrefixLogger(logPrefix), + xl: xl, + ctx: xlog.NewContext(ctx, xl), } - pw.AddLogPrefix(pw.Name) if baseInfo.HealthCheckType != "" { pw.health = 1 // means failed - pw.monitor = health.NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS, + pw.monitor = health.NewHealthCheckMonitor(pw.ctx, baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS, baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr, baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback) - pw.monitor.SetLogger(pw.Logger) - pw.Trace("enable health check monitor") + xl.Trace("enable health check monitor") } - pw.pxy = NewProxy(pw.Cfg, clientCfg, serverUDPPort) + pw.pxy = NewProxy(pw.ctx, pw.Cfg, clientCfg, serverUDPPort) return pw } @@ -147,6 +149,7 @@ func (pw *ProxyWrapper) Stop() { } func (pw *ProxyWrapper) checkWorker() { + xl := pw.xl if pw.monitor != nil { // let monitor do check request first time.Sleep(500 * time.Millisecond) @@ -161,7 +164,7 @@ func (pw *ProxyWrapper) checkWorker() { (pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) || (pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) { - pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart) + xl.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart) pw.Status = ProxyStatusWaitStart var newProxyMsg msg.NewProxy @@ -180,7 +183,7 @@ func (pw *ProxyWrapper) checkWorker() { ProxyName: pw.Name, }, }) - pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed) + xl.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed) pw.Status = ProxyStatusCheckFailed } pw.mu.Unlock() @@ -196,6 +199,7 @@ func (pw *ProxyWrapper) checkWorker() { } func (pw *ProxyWrapper) statusNormalCallback() { + xl := pw.xl atomic.StoreUint32(&pw.health, 0) errors.PanicToError(func() { select { @@ -203,10 +207,11 @@ func (pw *ProxyWrapper) statusNormalCallback() { default: } }) - pw.Info("health check success") + xl.Info("health check success") } func (pw *ProxyWrapper) statusFailedCallback() { + xl := pw.xl atomic.StoreUint32(&pw.health, 1) errors.PanicToError(func() { select { @@ -214,15 +219,16 @@ func (pw *ProxyWrapper) statusFailedCallback() { default: } }) - pw.Info("health check failed") + xl.Info("health check failed") } -func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn, m *msg.StartWorkConn) { +func (pw *ProxyWrapper) InWorkConn(workConn net.Conn, m *msg.StartWorkConn) { + xl := pw.xl pw.mu.RLock() pxy := pw.pxy pw.mu.RUnlock() if pxy != nil { - workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) + xl.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) go pxy.InWorkConn(workConn, m) } else { workConn.Close() diff --git a/client/service.go b/client/service.go index 9f86ec7..095df0a 100644 --- a/client/service.go +++ b/client/service.go @@ -15,9 +15,11 @@ package client import ( + "context" "crypto/tls" "fmt" "io/ioutil" + "net" "runtime" "sync" "sync/atomic" @@ -30,6 +32,7 @@ import ( frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/version" + "github.com/fatedier/frp/utils/xlog" fmux "github.com/hashicorp/yamux" ) @@ -55,19 +58,25 @@ type Service struct { // This is configured by the login response from frps serverUDPPort int - exit uint32 // 0 means not exit - closedCh chan int + exit uint32 // 0 means not exit + + // service context + ctx context.Context + // call cancel to stop service + cancel context.CancelFunc } -// NewService creates a new client service with the given configuration. func NewService(cfg config.ClientCommonConf, pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf, cfgFile string) (svr *Service, err error) { + + ctx, cancel := context.WithCancel(context.Background()) svr = &Service{ cfg: cfg, cfgFile: cfgFile, pxyCfgs: pxyCfgs, visitorCfgs: visitorCfgs, exit: 0, - closedCh: make(chan int), + ctx: xlog.NewContext(ctx, xlog.New()), + cancel: cancel, } return } @@ -79,11 +88,13 @@ func (svr *Service) GetController() *Control { } func (svr *Service) Run() error { - // first login + xl := xlog.FromContextSafe(svr.ctx) + + // login to frps for { conn, session, err := svr.login() if err != nil { - log.Warn("login to server failed: %v", err) + xl.Warn("login to server failed: %v", err) // if login_fail_exit is true, just exit this program // otherwise sleep a while and try again to connect to server @@ -94,7 +105,7 @@ func (svr *Service) Run() error { } } else { // login success - ctl := NewControl(svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort) + ctl := NewControl(svr.ctx, svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort) ctl.Run() svr.ctlMu.Lock() svr.ctl = ctl @@ -118,12 +129,12 @@ func (svr *Service) Run() error { } log.Info("admin server listen on %s:%d", svr.cfg.AdminAddr, svr.cfg.AdminPort) } - - <-svr.closedCh + <-svr.ctx.Done() return nil } func (svr *Service) keepControllerWorking() { + xl := xlog.FromContextSafe(svr.ctx) maxDelayTime := 20 * time.Second delayTime := time.Second @@ -134,10 +145,10 @@ func (svr *Service) keepControllerWorking() { } for { - log.Info("try to reconnect to server...") + xl.Info("try to reconnect to server...") conn, session, err := svr.login() if err != nil { - log.Warn("reconnect to server error: %v", err) + xl.Warn("reconnect to server error: %v", err) time.Sleep(delayTime) delayTime = delayTime * 2 if delayTime > maxDelayTime { @@ -148,7 +159,7 @@ func (svr *Service) keepControllerWorking() { // reconnect success, init delayTime delayTime = time.Second - ctl := NewControl(svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort) + ctl := NewControl(svr.ctx, svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort) ctl.Run() svr.ctlMu.Lock() svr.ctl = ctl @@ -161,7 +172,8 @@ func (svr *Service) keepControllerWorking() { // login creates a connection to frps and registers it self as a client // conn: control connection // session: if it's not nil, using tcp mux -func (svr *Service) login() (conn frpNet.Conn, session *fmux.Session, err error) { +func (svr *Service) login() (conn net.Conn, session *fmux.Session, err error) { + xl := xlog.FromContextSafe(svr.ctx) var tlsConfig *tls.Config if svr.cfg.TLSEnable { tlsConfig = &tls.Config{ @@ -197,7 +209,7 @@ func (svr *Service) login() (conn frpNet.Conn, session *fmux.Session, err error) err = errRet return } - conn = frpNet.WrapConn(stream) + conn = stream } now := time.Now().Unix() @@ -225,13 +237,16 @@ func (svr *Service) login() (conn frpNet.Conn, session *fmux.Session, err error) if loginRespMsg.Error != "" { err = fmt.Errorf("%s", loginRespMsg.Error) - log.Error("%s", loginRespMsg.Error) + xl.Error("%s", loginRespMsg.Error) return } svr.runId = loginRespMsg.RunId + xl.ResetPrefixes() + xl.AppendPrefix(svr.runId) + svr.serverUDPPort = loginRespMsg.ServerUdpPort - log.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort) + xl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort) return } @@ -247,5 +262,5 @@ func (svr *Service) ReloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs func (svr *Service) Close() { atomic.StoreUint32(&svr.exit, 1) svr.ctl.Close() - close(svr.closedCh) + svr.cancel() } diff --git a/client/visitor.go b/client/visitor.go index c1ee004..a4900e0 100644 --- a/client/visitor.go +++ b/client/visitor.go @@ -16,6 +16,7 @@ package client import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -25,9 +26,9 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" + "github.com/fatedier/frp/utils/xlog" frpIo "github.com/fatedier/golib/io" "github.com/fatedier/golib/pool" @@ -38,13 +39,13 @@ import ( type Visitor interface { Run() error Close() - log.Logger } -func NewVisitor(ctl *Control, cfg config.VisitorConf) (visitor Visitor) { +func NewVisitor(ctx context.Context, ctl *Control, cfg config.VisitorConf) (visitor Visitor) { + xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseInfo().ProxyName) baseVisitor := BaseVisitor{ - ctl: ctl, - Logger: log.NewPrefixLogger(cfg.GetBaseInfo().ProxyName), + ctl: ctl, + ctx: xlog.NewContext(ctx, xl), } switch cfg := cfg.(type) { case *config.StcpVisitorConf: @@ -63,10 +64,11 @@ func NewVisitor(ctl *Control, cfg config.VisitorConf) (visitor Visitor) { type BaseVisitor struct { ctl *Control - l frpNet.Listener + l net.Listener closed bool - mu sync.RWMutex - log.Logger + + mu sync.RWMutex + ctx context.Context } type StcpVisitor struct { @@ -76,7 +78,7 @@ type StcpVisitor struct { } func (sv *StcpVisitor) Run() (err error) { - sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, sv.cfg.BindPort) + sv.l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort)) if err != nil { return } @@ -90,10 +92,11 @@ func (sv *StcpVisitor) Close() { } func (sv *StcpVisitor) worker() { + xl := xlog.FromContextSafe(sv.ctx) for { conn, err := sv.l.Accept() if err != nil { - sv.Warn("stcp local listener closed") + xl.Warn("stcp local listener closed") return } @@ -101,10 +104,11 @@ func (sv *StcpVisitor) worker() { } } -func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) { +func (sv *StcpVisitor) handleConn(userConn net.Conn) { + xl := xlog.FromContextSafe(sv.ctx) defer userConn.Close() - sv.Debug("get a new stcp user connection") + xl.Debug("get a new stcp user connection") visitorConn, err := sv.ctl.connectServer() if err != nil { return @@ -121,7 +125,7 @@ func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) { } err = msg.WriteMsg(visitorConn, newVisitorConnMsg) if err != nil { - sv.Warn("send newVisitorConnMsg to server error: %v", err) + xl.Warn("send newVisitorConnMsg to server error: %v", err) return } @@ -129,13 +133,13 @@ func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) { visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second)) err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg) if err != nil { - sv.Warn("get newVisitorConnRespMsg error: %v", err) + xl.Warn("get newVisitorConnRespMsg error: %v", err) return } visitorConn.SetReadDeadline(time.Time{}) if newVisitorConnRespMsg.Error != "" { - sv.Warn("start new visitor connection error: %s", newVisitorConnRespMsg.Error) + xl.Warn("start new visitor connection error: %s", newVisitorConnRespMsg.Error) return } @@ -144,7 +148,7 @@ func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) { if sv.cfg.UseEncryption { remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk)) if err != nil { - sv.Error("create encryption stream error: %v", err) + xl.Error("create encryption stream error: %v", err) return } } @@ -163,7 +167,7 @@ type XtcpVisitor struct { } func (sv *XtcpVisitor) Run() (err error) { - sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, sv.cfg.BindPort) + sv.l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort)) if err != nil { return } @@ -177,10 +181,11 @@ func (sv *XtcpVisitor) Close() { } func (sv *XtcpVisitor) worker() { + xl := xlog.FromContextSafe(sv.ctx) for { conn, err := sv.l.Accept() if err != nil { - sv.Warn("xtcp local listener closed") + xl.Warn("xtcp local listener closed") return } @@ -188,25 +193,26 @@ func (sv *XtcpVisitor) worker() { } } -func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) { +func (sv *XtcpVisitor) handleConn(userConn net.Conn) { + xl := xlog.FromContextSafe(sv.ctx) defer userConn.Close() - sv.Debug("get a new xtcp user connection") + xl.Debug("get a new xtcp user connection") if sv.ctl.serverUDPPort == 0 { - sv.Error("xtcp is not supported by server") + xl.Error("xtcp is not supported by server") return } raddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", sv.ctl.clientCfg.ServerAddr, sv.ctl.serverUDPPort)) if err != nil { - sv.Error("resolve server UDP addr error") + xl.Error("resolve server UDP addr error") return } visitorConn, err := net.DialUDP("udp", nil, raddr) if err != nil { - sv.Warn("dial server udp addr error: %v", err) + xl.Warn("dial server udp addr error: %v", err) return } defer visitorConn.Close() @@ -219,7 +225,7 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) { } err = msg.WriteMsg(visitorConn, natHoleVisitorMsg) if err != nil { - sv.Warn("send natHoleVisitorMsg to server error: %v", err) + xl.Warn("send natHoleVisitorMsg to server error: %v", err) return } @@ -229,24 +235,24 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) { buf := pool.GetBuf(1024) n, err := visitorConn.Read(buf) if err != nil { - sv.Warn("get natHoleRespMsg error: %v", err) + xl.Warn("get natHoleRespMsg error: %v", err) return } err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg) if err != nil { - sv.Warn("get natHoleRespMsg error: %v", err) + xl.Warn("get natHoleRespMsg error: %v", err) return } visitorConn.SetReadDeadline(time.Time{}) pool.PutBuf(buf) if natHoleRespMsg.Error != "" { - sv.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error) + xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error) return } - sv.Trace("get natHoleRespMsg, sid [%s], client address [%s], visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr) + xl.Trace("get natHoleRespMsg, sid [%s], client address [%s], visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr) // Close visitorConn, so we can use it's local address. visitorConn.Close() @@ -255,12 +261,12 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) { laddr, _ := net.ResolveUDPAddr("udp", visitorConn.LocalAddr().String()) daddr, err := net.ResolveUDPAddr("udp", natHoleRespMsg.ClientAddr) if err != nil { - sv.Error("resolve client udp address error: %v", err) + xl.Error("resolve client udp address error: %v", err) return } lConn, err := net.DialUDP("udp", laddr, daddr) if err != nil { - sv.Error("dial client udp address error: %v", err) + xl.Error("dial client udp address error: %v", err) return } defer lConn.Close() @@ -272,23 +278,23 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) { lConn.SetReadDeadline(time.Now().Add(8 * time.Second)) n, err = lConn.Read(sidBuf) if err != nil { - sv.Warn("get sid from client error: %v", err) + xl.Warn("get sid from client error: %v", err) return } lConn.SetReadDeadline(time.Time{}) if string(sidBuf[:n]) != natHoleRespMsg.Sid { - sv.Warn("incorrect sid from client") + xl.Warn("incorrect sid from client") return } pool.PutBuf(sidBuf) - sv.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid) + xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid) // wrap kcp connection var remote io.ReadWriteCloser remote, err = frpNet.NewKcpConnFromUdp(lConn, true, natHoleRespMsg.ClientAddr) if err != nil { - sv.Error("create kcp connection from udp connection error: %v", err) + xl.Error("create kcp connection from udp connection error: %v", err) return } @@ -297,13 +303,13 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) { fmuxCfg.LogOutput = ioutil.Discard sess, err := fmux.Client(remote, fmuxCfg) if err != nil { - sv.Error("create yamux session error: %v", err) + xl.Error("create yamux session error: %v", err) return } defer sess.Close() muxConn, err := sess.Open() if err != nil { - sv.Error("open yamux stream error: %v", err) + xl.Error("open yamux stream error: %v", err) return } @@ -311,7 +317,7 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) { if sv.cfg.UseEncryption { muxConnRWCloser, err = frpIo.WithEncryption(muxConnRWCloser, []byte(sv.cfg.Sk)) if err != nil { - sv.Error("create encryption stream error: %v", err) + xl.Error("create encryption stream error: %v", err) return } } @@ -320,5 +326,5 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) { } frpIo.Join(userConn, muxConnRWCloser) - sv.Debug("join connections closed") + xl.Debug("join connections closed") } diff --git a/client/visitor_manager.go b/client/visitor_manager.go index b223d55..3b9351e 100644 --- a/client/visitor_manager.go +++ b/client/visitor_manager.go @@ -15,11 +15,12 @@ package client import ( + "context" "sync" "time" "github.com/fatedier/frp/models/config" - "github.com/fatedier/frp/utils/log" + "github.com/fatedier/frp/utils/xlog" ) type VisitorManager struct { @@ -30,26 +31,29 @@ type VisitorManager struct { checkInterval time.Duration - mu sync.Mutex + mu sync.Mutex + ctx context.Context } -func NewVisitorManager(ctl *Control) *VisitorManager { +func NewVisitorManager(ctx context.Context, ctl *Control) *VisitorManager { return &VisitorManager{ ctl: ctl, cfgs: make(map[string]config.VisitorConf), visitors: make(map[string]Visitor), checkInterval: 10 * time.Second, + ctx: ctx, } } func (vm *VisitorManager) Run() { + xl := xlog.FromContextSafe(vm.ctx) for { time.Sleep(vm.checkInterval) vm.mu.Lock() for _, cfg := range vm.cfgs { name := cfg.GetBaseInfo().ProxyName if _, exist := vm.visitors[name]; !exist { - log.Info("try to start visitor [%s]", name) + xl.Info("try to start visitor [%s]", name) vm.startVisitor(cfg) } } @@ -59,19 +63,21 @@ func (vm *VisitorManager) Run() { // Hold lock before calling this function. func (vm *VisitorManager) startVisitor(cfg config.VisitorConf) (err error) { + xl := xlog.FromContextSafe(vm.ctx) name := cfg.GetBaseInfo().ProxyName - visitor := NewVisitor(vm.ctl, cfg) + visitor := NewVisitor(vm.ctx, vm.ctl, cfg) err = visitor.Run() if err != nil { - visitor.Warn("start error: %v", err) + xl.Warn("start error: %v", err) } else { vm.visitors[name] = visitor - visitor.Info("start visitor success") + xl.Info("start visitor success") } return } func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) { + xl := xlog.FromContextSafe(vm.ctx) vm.mu.Lock() defer vm.mu.Unlock() @@ -97,7 +103,7 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) { } } if len(delNames) > 0 { - log.Info("visitor removed: %v", delNames) + xl.Info("visitor removed: %v", delNames) } addNames := make([]string, 0) @@ -109,7 +115,7 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) { } } if len(addNames) > 0 { - log.Info("visitor added: %v", addNames) + xl.Info("visitor added: %v", addNames) } return } diff --git a/cmd/frps/root.go b/cmd/frps/root.go index fd6cdbd..ec175fe 100644 --- a/cmd/frps/root.go +++ b/cmd/frps/root.go @@ -202,7 +202,7 @@ func runServer(cfg config.ServerCommonConf) (err error) { if err != nil { return err } - log.Info("Start frps success") + log.Info("start frps success") svr.Run() return } diff --git a/go.sum b/go.sum index 5ddda4e..b9bdf2a 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,10 @@ -github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb h1:wCrNShQidLmvVWn/0PikGmpdP0vtQmnvyRg3ZBEhczw= github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb/go.mod h1:wx3gB6dbIfBRcucp94PI9Bt3I0F2c/MyNEWuhzpWiwk= github.com/fatedier/golib v0.0.0-20181107124048-ff8cd814b049 h1:teH578mf2ii42NHhIp3PhgvjU5bv+NFMq9fSQR8NaG8= github.com/fatedier/golib v0.0.0-20181107124048-ff8cd814b049/go.mod h1:DqIrnl0rp3Zybg9zbJmozTy1n8fYJoX+QoAj9slIkKM= github.com/fatedier/kcp-go v2.0.4-0.20190803094908-fe8645b0a904+incompatible h1:ssXat9YXFvigNge/IkkZvFMn8yeYKFX+uI6wn2mLJ74= github.com/fatedier/kcp-go v2.0.4-0.20190803094908-fe8645b0a904+incompatible/go.mod h1:YpCOaxj7vvMThhIQ9AfTOPW2sfztQR5WDfs7AflSy4s= -github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= @@ -26,28 +22,18 @@ github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/ github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc h1:lNOt1SMsgHXTdpuGw+RpnJtzUcCb/oRKZP65pBy9pr8= github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY= -github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rakyll/statik v0.1.1 h1:fCLHsIMajHqD5RKigbFXpvX3dN7c80Pm12+NCrI3kvg= github.com/rakyll/statik v0.1.1/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs= -github.com/rodaine/table v1.0.0 h1:UaCJG5Axc/cNXVGXqnCrffm1KxP0OfYLe1HuJLf5sFY= github.com/rodaine/table v1.0.0/go.mod h1:YAUzwPOji0DUJNEvggdxyQcUAl4g3hDRcFlyjnnR51I= -github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= -github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/templexxx/cpufeat v0.0.0-20170927014610-3794dfbfb047 h1:K+jtWCOuZgCra7eXZ/VWn2FbJmrA/D058mTXhh2rq+8= github.com/templexxx/cpufeat v0.0.0-20170927014610-3794dfbfb047/go.mod h1:wM7WEvslTq+iOEAMDLSzhVuOt5BRZ05WirO+b09GHQU= -github.com/templexxx/xor v0.0.0-20170926022130-0af8e873c554 h1:pexgSe+JCFuxG+uoMZLO+ce8KHtdHGhst4cs6rw3gmk= github.com/templexxx/xor v0.0.0-20170926022130-0af8e873c554/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4= -github.com/tjfoc/gmsm v0.0.0-20171124023159-98aa888b79d8 h1:6CNSDqI1wiE+JqyOy5Qt/yo/DoNI2/QmmOZeiCid2Nw= github.com/tjfoc/gmsm v0.0.0-20171124023159-98aa888b79d8/go.mod h1:XxO4hdhhrzAd+G4CjDqaOkd0hUzmtPR/d3EiBBMn/wc= -github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec h1:DGmKwyZwEB8dI7tbLt/I/gQuP559o/0FrAkHKlQM/Ks= github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec/go.mod h1:owBmyHYMLkxyrugmfwE/DLJyW8Ro9mkphwuVErQ0iUw= github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae h1:J0GxkO96kL4WF+AIT3M4mfUVinOCPgf2uUWYFUzN0sM= github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE= diff --git a/models/plugin/http_proxy.go b/models/plugin/http_proxy.go index 3afa2cb..b0ca923 100644 --- a/models/plugin/http_proxy.go +++ b/models/plugin/http_proxy.go @@ -64,7 +64,7 @@ func (hp *HttpProxy) Name() string { return PluginHttpProxy } -func (hp *HttpProxy) Handle(conn io.ReadWriteCloser, realConn frpNet.Conn, extraBufToLocal []byte) { +func (hp *HttpProxy) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) { wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn) sc, rd := gnet.NewSharedConn(wrapConn) diff --git a/models/plugin/https2http.go b/models/plugin/https2http.go index f840ebd..6554035 100644 --- a/models/plugin/https2http.go +++ b/models/plugin/https2http.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "fmt" "io" + "net" "net/http" "net/http/httputil" "strings" @@ -115,7 +116,7 @@ func (p *HTTPS2HTTPPlugin) genTLSConfig() (*tls.Config, error) { return config, nil } -func (p *HTTPS2HTTPPlugin) Handle(conn io.ReadWriteCloser, realConn frpNet.Conn, extraBufToLocal []byte) { +func (p *HTTPS2HTTPPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) { wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn) p.l.PutConn(wrapConn) } diff --git a/models/plugin/plugin.go b/models/plugin/plugin.go index cfad551..6850919 100644 --- a/models/plugin/plugin.go +++ b/models/plugin/plugin.go @@ -20,8 +20,6 @@ import ( "net" "sync" - frpNet "github.com/fatedier/frp/utils/net" - "github.com/fatedier/golib/errors" ) @@ -46,7 +44,9 @@ func Create(name string, params map[string]string) (p Plugin, err error) { type Plugin interface { Name() string - Handle(conn io.ReadWriteCloser, realConn frpNet.Conn, extraBufToLocal []byte) + + // extraBufToLocal will send to local connection first, then join conn with local connection + Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) Close() error } diff --git a/models/plugin/socks5.go b/models/plugin/socks5.go index 447602a..dc08934 100644 --- a/models/plugin/socks5.go +++ b/models/plugin/socks5.go @@ -18,6 +18,7 @@ import ( "io" "io/ioutil" "log" + "net" frpNet "github.com/fatedier/frp/utils/net" @@ -53,7 +54,7 @@ func NewSocks5Plugin(params map[string]string) (p Plugin, err error) { return } -func (sp *Socks5Plugin) Handle(conn io.ReadWriteCloser, realConn frpNet.Conn, extraBufToLocal []byte) { +func (sp *Socks5Plugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) { defer conn.Close() wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn) sp.Server.ServeConn(wrapConn) diff --git a/models/plugin/static_file.go b/models/plugin/static_file.go index 080ff74..3232531 100644 --- a/models/plugin/static_file.go +++ b/models/plugin/static_file.go @@ -16,6 +16,7 @@ package plugin import ( "io" + "net" "net/http" frpNet "github.com/fatedier/frp/utils/net" @@ -72,7 +73,7 @@ func NewStaticFilePlugin(params map[string]string) (Plugin, error) { return sp, nil } -func (sp *StaticFilePlugin) Handle(conn io.ReadWriteCloser, realConn frpNet.Conn, extraBufToLocal []byte) { +func (sp *StaticFilePlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) { wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn) sp.l.PutConn(wrapConn) } diff --git a/models/plugin/unix_domain_socket.go b/models/plugin/unix_domain_socket.go index 86833e2..a85ada7 100644 --- a/models/plugin/unix_domain_socket.go +++ b/models/plugin/unix_domain_socket.go @@ -19,8 +19,6 @@ import ( "io" "net" - frpNet "github.com/fatedier/frp/utils/net" - frpIo "github.com/fatedier/golib/io" ) @@ -53,7 +51,7 @@ func NewUnixDomainSocketPlugin(params map[string]string) (p Plugin, err error) { return } -func (uds *UnixDomainSocketPlugin) Handle(conn io.ReadWriteCloser, realConn frpNet.Conn, extraBufToLocal []byte) { +func (uds *UnixDomainSocketPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) { localConn, err := net.DialUnix("unix", nil, uds.UnixAddr) if err != nil { return diff --git a/server/control.go b/server/control.go index bb802a7..0db6198 100644 --- a/server/control.go +++ b/server/control.go @@ -15,8 +15,10 @@ package server import ( + "context" "fmt" "io" + "net" "runtime/debug" "sync" "time" @@ -28,8 +30,8 @@ import ( "github.com/fatedier/frp/server/controller" "github.com/fatedier/frp/server/proxy" "github.com/fatedier/frp/server/stats" - "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/version" + "github.com/fatedier/frp/utils/xlog" "github.com/fatedier/golib/control/shutdown" "github.com/fatedier/golib/crypto" @@ -131,9 +133,12 @@ type Control struct { // Server configuration information serverCfg config.ServerCommonConf + + xl *xlog.Logger + ctx context.Context } -func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager, +func NewControl(ctx context.Context, rc *controller.ResourceController, pxyManager *proxy.ProxyManager, statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login, serverCfg config.ServerCommonConf) *Control { @@ -161,6 +166,8 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage managerShutdown: shutdown.New(), allShutdown: shutdown.New(), serverCfg: serverCfg, + xl: xlog.FromContextSafe(ctx), + ctx: ctx, } } @@ -185,18 +192,19 @@ func (ctl *Control) Start() { } func (ctl *Control) RegisterWorkConn(conn net.Conn) { + xl := ctl.xl defer func() { if err := recover(); err != nil { - ctl.conn.Error("panic error: %v", err) - ctl.conn.Error(string(debug.Stack())) + xl.Error("panic error: %v", err) + xl.Error(string(debug.Stack())) } }() select { case ctl.workConnCh <- conn: - ctl.conn.Debug("new work connection registered") + xl.Debug("new work connection registered") default: - ctl.conn.Debug("work connection pool is full, discarding") + xl.Debug("work connection pool is full, discarding") conn.Close() } } @@ -206,10 +214,11 @@ func (ctl *Control) RegisterWorkConn(conn net.Conn) { // and wait until it is available. // return an error if wait timeout func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) { + xl := ctl.xl defer func() { if err := recover(); err != nil { - ctl.conn.Error("panic error: %v", err) - ctl.conn.Error(string(debug.Stack())) + xl.Error("panic error: %v", err) + xl.Error(string(debug.Stack())) } }() @@ -221,14 +230,14 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) { err = frpErr.ErrCtlClosed return } - ctl.conn.Debug("get work connection from pool") + xl.Debug("get work connection from pool") default: // no work connections available in the poll, send message to frpc to get more err = errors.PanicToError(func() { ctl.sendCh <- &msg.ReqWorkConn{} }) if err != nil { - ctl.conn.Error("%v", err) + xl.Error("%v", err) return } @@ -236,13 +245,13 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) { case workConn, ok = <-ctl.workConnCh: if !ok { err = frpErr.ErrCtlClosed - ctl.conn.Warn("no work connections avaiable, %v", err) + xl.Warn("no work connections avaiable, %v", err) return } case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second): err = fmt.Errorf("timeout trying to get work connection") - ctl.conn.Warn("%v", err) + xl.Warn("%v", err) return } } @@ -255,16 +264,18 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) { } func (ctl *Control) Replaced(newCtl *Control) { - ctl.conn.Info("Replaced by client [%s]", newCtl.runId) + xl := ctl.xl + xl.Info("Replaced by client [%s]", newCtl.runId) ctl.runId = "" ctl.allShutdown.Start() } func (ctl *Control) writer() { + xl := ctl.xl defer func() { if err := recover(); err != nil { - ctl.conn.Error("panic error: %v", err) - ctl.conn.Error(string(debug.Stack())) + xl.Error("panic error: %v", err) + xl.Error(string(debug.Stack())) } }() @@ -273,17 +284,17 @@ func (ctl *Control) writer() { encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.serverCfg.Token)) if err != nil { - ctl.conn.Error("crypto new writer error: %v", err) + xl.Error("crypto new writer error: %v", err) ctl.allShutdown.Start() return } for { if m, ok := <-ctl.sendCh; !ok { - ctl.conn.Info("control writer is closing") + xl.Info("control writer is closing") return } else { if err := msg.WriteMsg(encWriter, m); err != nil { - ctl.conn.Warn("write message to control connection error: %v", err) + xl.Warn("write message to control connection error: %v", err) return } } @@ -291,10 +302,11 @@ func (ctl *Control) writer() { } func (ctl *Control) reader() { + xl := ctl.xl defer func() { if err := recover(); err != nil { - ctl.conn.Error("panic error: %v", err) - ctl.conn.Error(string(debug.Stack())) + xl.Error("panic error: %v", err) + xl.Error(string(debug.Stack())) } }() @@ -305,10 +317,10 @@ func (ctl *Control) reader() { for { if m, err := msg.ReadMsg(encReader); err != nil { if err == io.EOF { - ctl.conn.Debug("control connection closed") + xl.Debug("control connection closed") return } else { - ctl.conn.Warn("read error: %v", err) + xl.Warn("read error: %v", err) ctl.conn.Close() return } @@ -319,10 +331,11 @@ func (ctl *Control) reader() { } func (ctl *Control) stoper() { + xl := ctl.xl defer func() { if err := recover(); err != nil { - ctl.conn.Error("panic error: %v", err) - ctl.conn.Error(string(debug.Stack())) + xl.Error("panic error: %v", err) + xl.Error(string(debug.Stack())) } }() @@ -355,7 +368,7 @@ func (ctl *Control) stoper() { } ctl.allShutdown.Done() - ctl.conn.Info("client exit success") + xl.Info("client exit success") ctl.statsCollector.Mark(stats.TypeCloseClient, &stats.CloseClientPayload{}) } @@ -366,10 +379,11 @@ func (ctl *Control) WaitClosed() { } func (ctl *Control) manager() { + xl := ctl.xl defer func() { if err := recover(); err != nil { - ctl.conn.Error("panic error: %v", err) - ctl.conn.Error(string(debug.Stack())) + xl.Error("panic error: %v", err) + xl.Error(string(debug.Stack())) } }() @@ -383,7 +397,7 @@ func (ctl *Control) manager() { select { case <-heartbeat.C: if time.Since(ctl.lastPing) > time.Duration(ctl.serverCfg.HeartBeatTimeout)*time.Second { - ctl.conn.Warn("heartbeat timeout") + xl.Warn("heartbeat timeout") return } case rawMsg, ok := <-ctl.readCh: @@ -400,10 +414,10 @@ func (ctl *Control) manager() { } if err != nil { resp.Error = err.Error() - ctl.conn.Warn("new proxy [%s] error: %v", m.ProxyName, err) + xl.Warn("new proxy [%s] error: %v", m.ProxyName, err) } else { resp.RemoteAddr = remoteAddr - ctl.conn.Info("new proxy [%s] success", m.ProxyName) + xl.Info("new proxy [%s] success", m.ProxyName) ctl.statsCollector.Mark(stats.TypeNewProxy, &stats.NewProxyPayload{ Name: m.ProxyName, ProxyType: m.ProxyType, @@ -412,10 +426,10 @@ func (ctl *Control) manager() { ctl.sendCh <- resp case *msg.CloseProxy: ctl.CloseProxy(m) - ctl.conn.Info("close proxy [%s] success", m.ProxyName) + xl.Info("close proxy [%s] success", m.ProxyName) case *msg.Ping: ctl.lastPing = time.Now() - ctl.conn.Debug("receive heartbeat") + xl.Debug("receive heartbeat") ctl.sendCh <- &msg.Pong{} } } @@ -432,7 +446,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err // NewProxy will return a interface Proxy. // In fact it create different proxies by different proxy type, we just call run() here. - pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg) + pxy, err := proxy.NewProxy(ctl.ctx, ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg) if err != nil { return remoteAddr, err } diff --git a/server/controller/visitor.go b/server/controller/visitor.go index ea8a53e..22ff694 100644 --- a/server/controller/visitor.go +++ b/server/controller/visitor.go @@ -17,6 +17,7 @@ package controller import ( "fmt" "io" + "net" "sync" frpNet "github.com/fatedier/frp/utils/net" @@ -55,7 +56,7 @@ func (vm *VisitorManager) Listen(name string, sk string) (l *frpNet.CustomListen return } -func (vm *VisitorManager) NewConn(name string, conn frpNet.Conn, timestamp int64, signKey string, +func (vm *VisitorManager) NewConn(name string, conn net.Conn, timestamp int64, signKey string, useEncryption bool, useCompression bool) (err error) { vm.mu.RLock() diff --git a/server/group/http.go b/server/group/http.go index 538dccf..3160968 100644 --- a/server/group/http.go +++ b/server/group/http.go @@ -2,11 +2,10 @@ package group import ( "fmt" + "net" "sync" "sync/atomic" - frpNet "github.com/fatedier/frp/utils/net" - "github.com/fatedier/frp/utils/vhost" ) @@ -131,7 +130,7 @@ func (g *HTTPGroup) UnRegister(proxyName string) (isEmpty bool) { return } -func (g *HTTPGroup) createConn(remoteAddr string) (frpNet.Conn, error) { +func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) { var f vhost.CreateConnFunc newIndex := atomic.AddUint64(&g.index, 1) diff --git a/server/proxy/http.go b/server/proxy/http.go index 5bd3139..09fb3e4 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -36,6 +36,7 @@ type HttpProxy struct { } func (pxy *HttpProxy) Run() (remoteAddr string, err error) { + xl := pxy.xl routeConfig := vhost.VhostRouteConfig{ RewriteHost: pxy.cfg.HostHeaderRewrite, Headers: pxy.cfg.Headers, @@ -88,7 +89,7 @@ func (pxy *HttpProxy) Run() (remoteAddr string, err error) { }) } addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(pxy.serverCfg.VhostHttpPort))) - pxy.Info("http proxy listen for host [%s] location [%s] group [%s]", routeConfig.Domain, routeConfig.Location, pxy.cfg.Group) + xl.Info("http proxy listen for host [%s] location [%s] group [%s]", routeConfig.Domain, routeConfig.Location, pxy.cfg.Group) } } @@ -120,7 +121,7 @@ func (pxy *HttpProxy) Run() (remoteAddr string, err error) { } addrs = append(addrs, util.CanonicalAddr(tmpDomain, pxy.serverCfg.VhostHttpPort)) - pxy.Info("http proxy listen for host [%s] location [%s] group [%s]", routeConfig.Domain, routeConfig.Location, pxy.cfg.Group) + xl.Info("http proxy listen for host [%s] location [%s] group [%s]", routeConfig.Domain, routeConfig.Location, pxy.cfg.Group) } } remoteAddr = strings.Join(addrs, ",") @@ -131,10 +132,11 @@ func (pxy *HttpProxy) GetConf() config.ProxyConf { return pxy.cfg } -func (pxy *HttpProxy) GetRealConn(remoteAddr string) (workConn frpNet.Conn, err error) { +func (pxy *HttpProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err error) { + xl := pxy.xl rAddr, errRet := net.ResolveTCPAddr("tcp", remoteAddr) if errRet != nil { - pxy.Warn("resolve TCP addr [%s] error: %v", remoteAddr, errRet) + xl.Warn("resolve TCP addr [%s] error: %v", remoteAddr, errRet) // we do not return error here since remoteAddr is not necessary for proxies without proxy protocol enabled } @@ -148,7 +150,7 @@ func (pxy *HttpProxy) GetRealConn(remoteAddr string) (workConn frpNet.Conn, err if pxy.cfg.UseEncryption { rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.serverCfg.Token)) if err != nil { - pxy.Error("create encryption stream error: %v", err) + xl.Error("create encryption stream error: %v", err) return } } diff --git a/server/proxy/https.go b/server/proxy/https.go index 8784042..0191496 100644 --- a/server/proxy/https.go +++ b/server/proxy/https.go @@ -28,6 +28,7 @@ type HttpsProxy struct { } func (pxy *HttpsProxy) Run() (remoteAddr string, err error) { + xl := pxy.xl routeConfig := &vhost.VhostRouteConfig{} defer func() { @@ -42,26 +43,24 @@ func (pxy *HttpsProxy) Run() (remoteAddr string, err error) { } routeConfig.Domain = domain - l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig) + l, errRet := pxy.rc.VhostHttpsMuxer.Listen(pxy.ctx, routeConfig) if errRet != nil { err = errRet return } - l.AddLogPrefix(pxy.name) - pxy.Info("https proxy listen for host [%s]", routeConfig.Domain) + xl.Info("https proxy listen for host [%s]", routeConfig.Domain) pxy.listeners = append(pxy.listeners, l) addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, pxy.serverCfg.VhostHttpsPort)) } if pxy.cfg.SubDomain != "" { routeConfig.Domain = pxy.cfg.SubDomain + "." + pxy.serverCfg.SubDomainHost - l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig) + l, errRet := pxy.rc.VhostHttpsMuxer.Listen(pxy.ctx, routeConfig) if errRet != nil { err = errRet return } - l.AddLogPrefix(pxy.name) - pxy.Info("https proxy listen for host [%s]", routeConfig.Domain) + xl.Info("https proxy listen for host [%s]", routeConfig.Domain) pxy.listeners = append(pxy.listeners, l) addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(pxy.serverCfg.VhostHttpsPort))) } diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 627b0e1..84a04e6 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -15,6 +15,7 @@ package proxy import ( + "context" "fmt" "io" "net" @@ -25,48 +26,54 @@ import ( "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/server/controller" "github.com/fatedier/frp/server/stats" - "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/xlog" frpIo "github.com/fatedier/golib/io" ) -type GetWorkConnFn func() (frpNet.Conn, error) +type GetWorkConnFn func() (net.Conn, error) type Proxy interface { + Context() context.Context Run() (remoteAddr string, err error) GetName() string GetConf() config.ProxyConf - GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error) + GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) GetUsedPortsNum() int Close() - log.Logger } type BaseProxy struct { name string rc *controller.ResourceController statsCollector stats.Collector - listeners []frpNet.Listener + listeners []net.Listener usedPortsNum int poolCount int getWorkConnFn GetWorkConnFn serverCfg config.ServerCommonConf - mu sync.RWMutex - log.Logger + mu sync.RWMutex + xl *xlog.Logger + ctx context.Context } func (pxy *BaseProxy) GetName() string { return pxy.name } +func (pxy *BaseProxy) Context() context.Context { + return pxy.ctx +} + func (pxy *BaseProxy) GetUsedPortsNum() int { return pxy.usedPortsNum } func (pxy *BaseProxy) Close() { - pxy.Info("proxy closing") + xl := xlog.FromContextSafe(pxy.ctx) + xl.Info("proxy closing") for _, l := range pxy.listeners { l.Close() } @@ -74,15 +81,17 @@ func (pxy *BaseProxy) Close() { // GetWorkConnFromPool try to get a new work connections from pool // for quickly response, we immediately send the StartWorkConn message to frpc after take out one from pool -func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error) { +func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) { + xl := xlog.FromContextSafe(pxy.ctx) // try all connections from the pool for i := 0; i < pxy.poolCount+1; i++ { if workConn, err = pxy.getWorkConnFn(); err != nil { - pxy.Warn("failed to get work connection: %v", err) + xl.Warn("failed to get work connection: %v", err) return } - pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String()) - workConn.AddLogPrefix(pxy.GetName()) + xl.Info("get a new work connection: [%s]", workConn.RemoteAddr().String()) + xl.Spawn().AppendPrefix(pxy.GetName()) + workConn = frpNet.NewContextConn(workConn, pxy.ctx) var ( srcAddr string @@ -109,7 +118,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Co DstPort: uint16(dstPort), }) if err != nil { - workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i) + xl.Warn("failed to send message to work connection from pool: %v, times: %d", err, i) workConn.Close() } else { break @@ -117,7 +126,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Co } if err != nil { - pxy.Error("try to get work connection failed in the end") + xl.Error("try to get work connection failed in the end") return } return @@ -126,36 +135,39 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Co // startListenHandler start a goroutine handler for each listener. // p: p will just be passed to handler(Proxy, frpNet.Conn). // handler: each proxy type can set different handler function to deal with connections accepted from listeners. -func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn, stats.Collector, config.ServerCommonConf)) { +func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, stats.Collector, config.ServerCommonConf)) { + xl := xlog.FromContextSafe(pxy.ctx) for _, listener := range pxy.listeners { - go func(l frpNet.Listener) { + go func(l net.Listener) { for { // block // if listener is closed, err returned c, err := l.Accept() if err != nil { - pxy.Info("listener is closed") + xl.Info("listener is closed") return } - pxy.Debug("get a user connection [%s]", c.RemoteAddr().String()) + xl.Debug("get a user connection [%s]", c.RemoteAddr().String()) go handler(p, c, pxy.statsCollector, pxy.serverCfg) } }(listener) } } -func NewProxy(runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int, +func NewProxy(ctx context.Context, runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int, getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) { + xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName) basePxy := BaseProxy{ name: pxyConf.GetBaseInfo().ProxyName, rc: rc, statsCollector: statsCollector, - listeners: make([]frpNet.Listener, 0), + listeners: make([]net.Listener, 0), poolCount: poolCount, getWorkConnFn: getWorkConnFn, - Logger: log.NewPrefixLogger(runId), serverCfg: serverCfg, + xl: xl, + ctx: xlog.NewContext(ctx, xl), } switch cfg := pxyConf.(type) { case *config.TcpProxyConf: @@ -193,13 +205,13 @@ func NewProxy(runId string, rc *controller.ResourceController, statsCollector st default: return pxy, fmt.Errorf("proxy type not support") } - pxy.AddLogPrefix(pxy.GetName()) return } // HandleUserTcpConnection is used for incoming tcp user connections. // It can be used for tcp, http, https type. -func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector stats.Collector, serverCfg config.ServerCommonConf) { +func HandleUserTcpConnection(pxy Proxy, userConn net.Conn, statsCollector stats.Collector, serverCfg config.ServerCommonConf) { + xl := xlog.FromContextSafe(pxy.Context()) defer userConn.Close() // try all connections from the pool @@ -211,17 +223,18 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta var local io.ReadWriteCloser = workConn cfg := pxy.GetConf().GetBaseInfo() + xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression) if cfg.UseEncryption { local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token)) if err != nil { - pxy.Error("create encryption stream error: %v", err) + xl.Error("create encryption stream error: %v", err) return } } if cfg.UseCompression { local = frpIo.WithCompression(local) } - pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), + xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()}) @@ -235,7 +248,7 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta ProxyName: pxy.GetName(), TrafficBytes: outCount, }) - pxy.Debug("join connections closed") + xl.Debug("join connections closed") } type ProxyManager struct { diff --git a/server/proxy/stcp.go b/server/proxy/stcp.go index 8cd43b2..27bfb14 100644 --- a/server/proxy/stcp.go +++ b/server/proxy/stcp.go @@ -24,14 +24,14 @@ type StcpProxy struct { } func (pxy *StcpProxy) Run() (remoteAddr string, err error) { + xl := pxy.xl listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk) if errRet != nil { err = errRet return } - listener.AddLogPrefix(pxy.name) pxy.listeners = append(pxy.listeners, listener) - pxy.Info("stcp proxy custom listen success") + xl.Info("stcp proxy custom listen success") pxy.startListenHandler(pxy, HandleUserTcpConnection) return diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 388531a..0ecfe26 100644 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -16,9 +16,9 @@ package proxy import ( "fmt" + "net" "github.com/fatedier/frp/models/config" - frpNet "github.com/fatedier/frp/utils/net" ) type TcpProxy struct { @@ -29,6 +29,7 @@ type TcpProxy struct { } func (pxy *TcpProxy) Run() (remoteAddr string, err error) { + xl := pxy.xl if pxy.cfg.Group != "" { l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, pxy.serverCfg.ProxyBindAddr, pxy.cfg.RemotePort) if errRet != nil { @@ -41,10 +42,8 @@ func (pxy *TcpProxy) Run() (remoteAddr string, err error) { } }() pxy.realPort = realPort - listener := frpNet.WrapLogListener(l) - listener.AddLogPrefix(pxy.name) - pxy.listeners = append(pxy.listeners, listener) - pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group) + pxy.listeners = append(pxy.listeners, l) + xl.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group) } else { pxy.realPort, err = pxy.rc.TcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) if err != nil { @@ -55,14 +54,13 @@ func (pxy *TcpProxy) Run() (remoteAddr string, err error) { pxy.rc.TcpPortManager.Release(pxy.realPort) } }() - listener, errRet := frpNet.ListenTcp(pxy.serverCfg.ProxyBindAddr, pxy.realPort) + listener, errRet := net.Listen("tcp", fmt.Sprintf("%s:%d", pxy.serverCfg.ProxyBindAddr, pxy.realPort)) if errRet != nil { err = errRet return } - listener.AddLogPrefix(pxy.name) pxy.listeners = append(pxy.listeners, listener) - pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort) + xl.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort) } pxy.cfg.RemotePort = pxy.realPort diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 453c7b5..397f60f 100644 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -54,6 +54,7 @@ type UdpProxy struct { } func (pxy *UdpProxy) Run() (remoteAddr string, err error) { + xl := pxy.xl pxy.realPort, err = pxy.rc.UdpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) if err != nil { return @@ -74,10 +75,10 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) { udpConn, errRet := net.ListenUDP("udp", addr) if errRet != nil { err = errRet - pxy.Warn("listen udp port error: %v", err) + xl.Warn("listen udp port error: %v", err) return } - pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort) + xl.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort) pxy.udpConn = udpConn pxy.sendCh = make(chan *msg.UdpPacket, 1024) @@ -91,11 +92,11 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) { rawMsg msg.Message errRet error ) - pxy.Trace("loop waiting message from udp workConn") + xl.Trace("loop waiting message from udp workConn") // client will send heartbeat in workConn for keeping alive conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second)) if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil { - pxy.Warn("read from workConn for udp error: %v", errRet) + xl.Warn("read from workConn for udp error: %v", errRet) conn.Close() // notify proxy to start a new work connection // ignore error here, it means the proxy is closed @@ -107,11 +108,11 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) { conn.SetReadDeadline(time.Time{}) switch m := rawMsg.(type) { case *msg.Ping: - pxy.Trace("udp work conn get ping message") + xl.Trace("udp work conn get ping message") continue case *msg.UdpPacket: if errRet := errors.PanicToError(func() { - pxy.Trace("get udp message from workConn: %s", m.Content) + xl.Trace("get udp message from workConn: %s", m.Content) pxy.readCh <- m pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{ ProxyName: pxy.GetName(), @@ -119,7 +120,7 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) { }) }); errRet != nil { conn.Close() - pxy.Info("reader goroutine for udp work connection closed") + xl.Info("reader goroutine for udp work connection closed") return } } @@ -133,15 +134,15 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) { select { case udpMsg, ok := <-pxy.sendCh: if !ok { - pxy.Info("sender goroutine for udp work connection closed") + xl.Info("sender goroutine for udp work connection closed") return } if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { - pxy.Info("sender goroutine for udp work connection closed: %v", errRet) + xl.Info("sender goroutine for udp work connection closed: %v", errRet) conn.Close() return } else { - pxy.Trace("send message to udp workConn: %s", udpMsg.Content) + xl.Trace("send message to udp workConn: %s", udpMsg.Content) pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{ ProxyName: pxy.GetName(), TrafficBytes: int64(len(udpMsg.Content)), @@ -149,7 +150,7 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) { continue } case <-ctx.Done(): - pxy.Info("sender goroutine for udp work connection closed") + xl.Info("sender goroutine for udp work connection closed") return } } diff --git a/server/proxy/xtcp.go b/server/proxy/xtcp.go index 925485c..92291c0 100644 --- a/server/proxy/xtcp.go +++ b/server/proxy/xtcp.go @@ -31,8 +31,10 @@ type XtcpProxy struct { } func (pxy *XtcpProxy) Run() (remoteAddr string, err error) { + xl := pxy.xl + if pxy.rc.NatHoleController == nil { - pxy.Error("udp port for xtcp is not specified.") + xl.Error("udp port for xtcp is not specified.") err = fmt.Errorf("xtcp is not supported in frps") return } @@ -53,7 +55,7 @@ func (pxy *XtcpProxy) Run() (remoteAddr string, err error) { } errRet = msg.WriteMsg(workConn, m) if errRet != nil { - pxy.Warn("write nat hole sid package error, %v", errRet) + xl.Warn("write nat hole sid package error, %v", errRet) workConn.Close() break } @@ -61,12 +63,12 @@ func (pxy *XtcpProxy) Run() (remoteAddr string, err error) { go func() { raw, errRet := msg.ReadMsg(workConn) if errRet != nil { - pxy.Warn("read nat hole client ok package error: %v", errRet) + xl.Warn("read nat hole client ok package error: %v", errRet) workConn.Close() return } if _, ok := raw.(*msg.NatHoleClientDetectOK); !ok { - pxy.Warn("read nat hole client ok package format error") + xl.Warn("read nat hole client ok package format error") workConn.Close() return } diff --git a/server/service.go b/server/service.go index 6d56101..a4d2e9d 100644 --- a/server/service.go +++ b/server/service.go @@ -16,6 +16,7 @@ package server import ( "bytes" + "context" "crypto/rand" "crypto/rsa" "crypto/tls" @@ -42,6 +43,7 @@ import ( "github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/version" "github.com/fatedier/frp/utils/vhost" + "github.com/fatedier/frp/utils/xlog" "github.com/fatedier/golib/net/mux" fmux "github.com/hashicorp/yamux" @@ -57,16 +59,16 @@ type Service struct { muxer *mux.Mux // Accept connections from client - listener frpNet.Listener + listener net.Listener // Accept connections using kcp - kcpListener frpNet.Listener + kcpListener net.Listener // Accept connections using websocket - websocketListener frpNet.Listener + websocketListener net.Listener // Accept frp tls connections - tlsListener frpNet.Listener + tlsListener net.Listener // Manage all controllers ctlManager *ControlManager @@ -135,7 +137,7 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) { go svr.muxer.Serve() ln = svr.muxer.DefaultListener() - svr.listener = frpNet.WrapLogListener(ln) + svr.listener = ln log.Info("frps tcp listen on %s:%d", cfg.BindAddr, cfg.BindPort) // Listen for accepting connections from client using kcp protocol. @@ -194,7 +196,7 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) { } } - svr.rc.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(frpNet.WrapLogListener(l), 30*time.Second) + svr.rc.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(l, 30*time.Second) if err != nil { err = fmt.Errorf("Create vhost httpsMuxer error, %v", err) return @@ -203,10 +205,9 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) { } // frp tls listener - tlsListener := svr.muxer.Listen(1, 1, func(data []byte) bool { + svr.tlsListener = svr.muxer.Listen(1, 1, func(data []byte) bool { return int(data[0]) == frpNet.FRP_TLS_HEAD_BYTE }) - svr.tlsListener = frpNet.WrapLogListener(tlsListener) // Create nat hole controller. if cfg.BindUdpPort > 0 { @@ -258,7 +259,7 @@ func (svr *Service) Run() { svr.HandleListener(svr.listener) } -func (svr *Service) HandleListener(l frpNet.Listener) { +func (svr *Service) HandleListener(l net.Listener) { // Listen for incoming connections from client. for { c, err := l.Accept() @@ -266,6 +267,9 @@ func (svr *Service) HandleListener(l frpNet.Listener) { log.Warn("Listener for incoming connections from client closed") return } + // inject xlog object into net.Conn context + xl := xlog.New() + c = frpNet.NewContextConn(c, xlog.NewContext(context.Background(), xl)) log.Trace("start check TLS connection...") originConn := c @@ -278,8 +282,8 @@ func (svr *Service) HandleListener(l frpNet.Listener) { log.Trace("success check TLS connection") // Start a new goroutine for dealing connections. - go func(frpConn frpNet.Conn) { - dealFn := func(conn frpNet.Conn) { + go func(frpConn net.Conn) { + dealFn := func(conn net.Conn) { var rawMsg msg.Message conn.SetReadDeadline(time.Now().Add(connReadTimeout)) if rawMsg, err = msg.ReadMsg(conn); err != nil { @@ -295,7 +299,7 @@ func (svr *Service) HandleListener(l frpNet.Listener) { // If login failed, send error message there. // Otherwise send success message in control's work goroutine. if err != nil { - conn.Warn("%v", err) + xl.Warn("register control error: %v", err) msg.WriteMsg(conn, &msg.LoginResp{ Version: version.Full(), Error: err.Error(), @@ -306,7 +310,7 @@ func (svr *Service) HandleListener(l frpNet.Listener) { svr.RegisterWorkConn(conn, m) case *msg.NewVisitorConn: if err = svr.RegisterVisitorConn(conn, m); err != nil { - conn.Warn("%v", err) + xl.Warn("register visitor conn error: %v", err) msg.WriteMsg(conn, &msg.NewVisitorConnResp{ ProxyName: m.ProxyName, Error: err.Error(), @@ -342,8 +346,7 @@ func (svr *Service) HandleListener(l frpNet.Listener) { session.Close() return } - wrapConn := frpNet.WrapConn(stream) - go dealFn(wrapConn) + go dealFn(stream) } } else { dealFn(frpConn) @@ -352,8 +355,21 @@ func (svr *Service) HandleListener(l frpNet.Listener) { } } -func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (err error) { - ctlConn.Info("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]", +func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err error) { + // If client's RunId is empty, it's a new client, we just create a new controller. + // Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one. + if loginMsg.RunId == "" { + loginMsg.RunId, err = util.RandId() + if err != nil { + return + } + } + + ctx := frpNet.NewContextFromConn(ctlConn) + xl := xlog.FromContextSafe(ctx) + xl.AppendPrefix(loginMsg.RunId) + ctx = xlog.NewContext(ctx, xl) + xl.Info("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]", ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch) // Check client version. @@ -368,22 +384,12 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e return } - // If client's RunId is empty, it's a new client, we just create a new controller. - // Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one. - if loginMsg.RunId == "" { - loginMsg.RunId, err = util.RandId() - if err != nil { - return - } - } - - ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, svr.cfg) + ctl := NewControl(ctx, svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, svr.cfg) if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil { oldCtl.allShutdown.WaitDone() } - ctlConn.AddLogPrefix(loginMsg.RunId) ctl.Start() // for statistics @@ -398,17 +404,18 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e } // RegisterWorkConn register a new work connection to control and proxies need it. -func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) { +func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) { + xl := frpNet.NewLogFromConn(workConn) ctl, exist := svr.ctlManager.GetById(newMsg.RunId) if !exist { - workConn.Warn("No client control found for run id [%s]", newMsg.RunId) + xl.Warn("No client control found for run id [%s]", newMsg.RunId) return } ctl.RegisterWorkConn(workConn) return } -func (svr *Service) RegisterVisitorConn(visitorConn frpNet.Conn, newMsg *msg.NewVisitorConn) error { +func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn) error { return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey, newMsg.UseEncryption, newMsg.UseCompression) } diff --git a/tests/ci/auto_test_frpc.ini b/tests/ci/auto_test_frpc.ini index 28ea5fd..b8d90bf 100644 --- a/tests/ci/auto_test_frpc.ini +++ b/tests/ci/auto_test_frpc.ini @@ -2,8 +2,7 @@ server_addr = 127.0.0.1 server_port = 10700 log_file = console -# debug, info, warn, error -log_level = debug +log_level = trace token = 123456 admin_port = 10600 admin_user = abc diff --git a/tests/ci/auto_test_frps.ini b/tests/ci/auto_test_frps.ini index 9300b55..8948c98 100644 --- a/tests/ci/auto_test_frps.ini +++ b/tests/ci/auto_test_frps.ini @@ -2,8 +2,7 @@ bind_addr = 0.0.0.0 bind_port = 10700 vhost_http_port = 10804 -log_file = console -log_level = debug +log_level = trace token = 123456 allow_ports = 10000-20000,20002,30000-50000 subdomain_host = sub.com diff --git a/tests/mock/echo_server.go b/tests/mock/echo_server.go index e029f50..5ae40ee 100644 --- a/tests/mock/echo_server.go +++ b/tests/mock/echo_server.go @@ -11,7 +11,7 @@ import ( ) type EchoServer struct { - l frpNet.Listener + l net.Listener port int repeatedNum int @@ -30,7 +30,7 @@ func NewEchoServer(port int, repeatedNum int, specifyStr string) *EchoServer { } func (es *EchoServer) Start() error { - l, err := frpNet.ListenTcp("127.0.0.1", es.port) + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", es.port)) if err != nil { fmt.Printf("echo server listen error: %v\n", err) return err diff --git a/tests/util/util.go b/tests/util/util.go index 4a4e6ff..163ddc2 100644 --- a/tests/util/util.go +++ b/tests/util/util.go @@ -14,7 +14,6 @@ import ( "time" "github.com/fatedier/frp/client" - frpNet "github.com/fatedier/frp/utils/net" ) func GetProxyStatus(statusAddr string, user string, passwd string, name string) (status *client.ProxyStatusResp, err error) { @@ -98,7 +97,7 @@ func ReloadConf(reloadAddr string, user string, passwd string) error { } func SendTcpMsg(addr string, msg string) (res string, err error) { - c, err := frpNet.ConnectTcpServer(addr) + c, err := net.Dial("tcp", addr) if err != nil { err = fmt.Errorf("connect to tcp server error: %v", err) return diff --git a/utils/log/log.go b/utils/log/log.go index 1a9033b..1ddf4cd 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -91,71 +91,3 @@ func Debug(format string, v ...interface{}) { func Trace(format string, v ...interface{}) { Log.Trace(format, v...) } - -// Logger is the log interface -type Logger interface { - AddLogPrefix(string) - GetPrefixStr() string - GetAllPrefix() []string - ClearLogPrefix() - Error(string, ...interface{}) - Warn(string, ...interface{}) - Info(string, ...interface{}) - Debug(string, ...interface{}) - Trace(string, ...interface{}) -} - -type PrefixLogger struct { - prefix string - allPrefix []string -} - -func NewPrefixLogger(prefix string) *PrefixLogger { - logger := &PrefixLogger{ - allPrefix: make([]string, 0), - } - logger.AddLogPrefix(prefix) - return logger -} - -func (pl *PrefixLogger) AddLogPrefix(prefix string) { - if len(prefix) == 0 { - return - } - - pl.prefix += "[" + prefix + "] " - pl.allPrefix = append(pl.allPrefix, prefix) -} - -func (pl *PrefixLogger) GetPrefixStr() string { - return pl.prefix -} - -func (pl *PrefixLogger) GetAllPrefix() []string { - return pl.allPrefix -} - -func (pl *PrefixLogger) ClearLogPrefix() { - pl.prefix = "" - pl.allPrefix = make([]string, 0) -} - -func (pl *PrefixLogger) Error(format string, v ...interface{}) { - Log.Error(pl.prefix+format, v...) -} - -func (pl *PrefixLogger) Warn(format string, v ...interface{}) { - Log.Warn(pl.prefix+format, v...) -} - -func (pl *PrefixLogger) Info(format string, v ...interface{}) { - Log.Info(pl.prefix+format, v...) -} - -func (pl *PrefixLogger) Debug(format string, v ...interface{}) { - Log.Debug(pl.prefix+format, v...) -} - -func (pl *PrefixLogger) Trace(format string, v ...interface{}) { - Log.Trace(pl.prefix+format, v...) -} diff --git a/utils/net/conn.go b/utils/net/conn.go index 9f415ec..6b1d3fa 100644 --- a/utils/net/conn.go +++ b/utils/net/conn.go @@ -15,6 +15,7 @@ package net import ( + "context" "crypto/tls" "errors" "fmt" @@ -23,41 +24,64 @@ import ( "sync/atomic" "time" - "github.com/fatedier/frp/utils/log" - + "github.com/fatedier/frp/utils/xlog" gnet "github.com/fatedier/golib/net" kcp "github.com/fatedier/kcp-go" ) -// Conn is the interface of connections used in frp. -type Conn interface { - net.Conn - log.Logger +type ContextGetter interface { + Context() context.Context } -type WrapLogConn struct { - net.Conn - log.Logger +type ContextSetter interface { + WithContext(ctx context.Context) } -func WrapConn(c net.Conn) Conn { - return &WrapLogConn{ - Conn: c, - Logger: log.NewPrefixLogger(""), +func NewLogFromConn(conn net.Conn) *xlog.Logger { + if c, ok := conn.(ContextGetter); ok { + return xlog.FromContextSafe(c.Context()) } + return xlog.New() +} + +func NewContextFromConn(conn net.Conn) context.Context { + if c, ok := conn.(ContextGetter); ok { + return c.Context() + } + return context.Background() +} + +// ContextConn is the connection with context +type ContextConn struct { + net.Conn + + ctx context.Context +} + +func NewContextConn(c net.Conn, ctx context.Context) *ContextConn { + return &ContextConn{ + Conn: c, + ctx: ctx, + } +} + +func (c *ContextConn) WithContext(ctx context.Context) { + c.ctx = ctx +} + +func (c *ContextConn) Context() context.Context { + return c.ctx } type WrapReadWriteCloserConn struct { io.ReadWriteCloser - log.Logger underConn net.Conn } -func WrapReadWriteCloserToConn(rwc io.ReadWriteCloser, underConn net.Conn) Conn { +func WrapReadWriteCloserToConn(rwc io.ReadWriteCloser, underConn net.Conn) net.Conn { return &WrapReadWriteCloserConn{ ReadWriteCloser: rwc, - Logger: log.NewPrefixLogger(""), underConn: underConn, } } @@ -99,7 +123,6 @@ func (conn *WrapReadWriteCloserConn) SetWriteDeadline(t time.Time) error { type CloseNotifyConn struct { net.Conn - log.Logger // 1 means closed closeFlag int32 @@ -108,10 +131,9 @@ type CloseNotifyConn struct { } // closeFn will be only called once -func WrapCloseNotifyConn(c net.Conn, closeFn func()) Conn { +func WrapCloseNotifyConn(c net.Conn, closeFn func()) net.Conn { return &CloseNotifyConn{ Conn: c, - Logger: log.NewPrefixLogger(""), closeFn: closeFn, } } @@ -128,7 +150,7 @@ func (cc *CloseNotifyConn) Close() (err error) { } type StatsConn struct { - Conn + net.Conn closed int64 // 1 means closed totalRead int64 @@ -136,7 +158,7 @@ type StatsConn struct { statsFunc func(totalRead, totalWrite int64) } -func WrapStatsConn(conn Conn, statsFunc func(total, totalWrite int64)) *StatsConn { +func WrapStatsConn(conn net.Conn, statsFunc func(total, totalWrite int64)) *StatsConn { return &StatsConn{ Conn: conn, statsFunc: statsFunc, @@ -166,10 +188,10 @@ func (statsConn *StatsConn) Close() (err error) { return } -func ConnectServer(protocol string, addr string) (c Conn, err error) { +func ConnectServer(protocol string, addr string) (c net.Conn, err error) { switch protocol { case "tcp": - return ConnectTcpServer(addr) + return net.Dial("tcp", addr) case "kcp": kcpConn, errRet := kcp.DialWithOptions(addr, nil, 10, 3) if errRet != nil { @@ -184,21 +206,17 @@ func ConnectServer(protocol string, addr string) (c Conn, err error) { kcpConn.SetACKNoDelay(false) kcpConn.SetReadBuffer(4194304) kcpConn.SetWriteBuffer(4194304) - c = WrapConn(kcpConn) + c = kcpConn return default: return nil, fmt.Errorf("unsupport protocol: %s", protocol) } } -func ConnectServerByProxy(proxyUrl string, protocol string, addr string) (c Conn, err error) { +func ConnectServerByProxy(proxyURL string, protocol string, addr string) (c net.Conn, err error) { switch protocol { case "tcp": - var conn net.Conn - if conn, err = gnet.DialTcpByProxy(proxyUrl, addr); err != nil { - return - } - return WrapConn(conn), nil + return gnet.DialTcpByProxy(proxyURL, addr) case "kcp": // http proxy is not supported for kcp return ConnectServer(protocol, addr) @@ -209,7 +227,7 @@ func ConnectServerByProxy(proxyUrl string, protocol string, addr string) (c Conn } } -func ConnectServerByProxyWithTLS(proxyUrl string, protocol string, addr string, tlsConfig *tls.Config) (c Conn, err error) { +func ConnectServerByProxyWithTLS(proxyUrl string, protocol string, addr string, tlsConfig *tls.Config) (c net.Conn, err error) { c, err = ConnectServerByProxy(proxyUrl, protocol, addr) if err != nil { return diff --git a/utils/net/kcp.go b/utils/net/kcp.go index 3d080fd..39eb898 100644 --- a/utils/net/kcp.go +++ b/utils/net/kcp.go @@ -18,17 +18,13 @@ import ( "fmt" "net" - "github.com/fatedier/frp/utils/log" - kcp "github.com/fatedier/kcp-go" ) type KcpListener struct { - net.Addr listener net.Listener - accept chan Conn + acceptCh chan net.Conn closeFlag bool - log.Logger } func ListenKcp(bindAddr string, bindPort int) (l *KcpListener, err error) { @@ -40,11 +36,9 @@ func ListenKcp(bindAddr string, bindPort int) (l *KcpListener, err error) { listener.SetWriteBuffer(4194304) l = &KcpListener{ - Addr: listener.Addr(), listener: listener, - accept: make(chan Conn), + acceptCh: make(chan net.Conn), closeFlag: false, - Logger: log.NewPrefixLogger(""), } go func() { @@ -52,7 +46,7 @@ func ListenKcp(bindAddr string, bindPort int) (l *KcpListener, err error) { conn, err := listener.AcceptKCP() if err != nil { if l.closeFlag { - close(l.accept) + close(l.acceptCh) return } continue @@ -64,14 +58,14 @@ func ListenKcp(bindAddr string, bindPort int) (l *KcpListener, err error) { conn.SetWindowSize(1024, 1024) conn.SetACKNoDelay(false) - l.accept <- WrapConn(conn) + l.acceptCh <- conn } }() return l, err } -func (l *KcpListener) Accept() (Conn, error) { - conn, ok := <-l.accept +func (l *KcpListener) Accept() (net.Conn, error) { + conn, ok := <-l.acceptCh if !ok { return conn, fmt.Errorf("channel for kcp listener closed") } @@ -86,6 +80,10 @@ func (l *KcpListener) Close() error { return nil } +func (l *KcpListener) Addr() net.Addr { + return l.listener.Addr() +} + func NewKcpConnFromUdp(conn *net.UDPConn, connected bool, raddr string) (net.Conn, error) { kcpConn, err := kcp.NewConnEx(1, connected, raddr, nil, 10, 3, conn) if err != nil { diff --git a/utils/net/listener.go b/utils/net/listener.go index 90ea59b..3b199c8 100644 --- a/utils/net/listener.go +++ b/utils/net/listener.go @@ -19,65 +19,34 @@ import ( "net" "sync" - "github.com/fatedier/frp/utils/log" - "github.com/fatedier/golib/errors" ) -type Listener interface { - Accept() (Conn, error) - Close() error - log.Logger -} - -type LogListener struct { - l net.Listener - net.Listener - log.Logger -} - -func WrapLogListener(l net.Listener) Listener { - return &LogListener{ - l: l, - Listener: l, - Logger: log.NewPrefixLogger(""), - } -} - -func (logL *LogListener) Accept() (Conn, error) { - c, err := logL.l.Accept() - return WrapConn(c), err -} - // Custom listener type CustomListener struct { - conns chan Conn - closed bool - mu sync.Mutex - - log.Logger + acceptCh chan net.Conn + closed bool + mu sync.Mutex } func NewCustomListener() *CustomListener { return &CustomListener{ - conns: make(chan Conn, 64), - Logger: log.NewPrefixLogger(""), + acceptCh: make(chan net.Conn, 64), } } -func (l *CustomListener) Accept() (Conn, error) { - conn, ok := <-l.conns +func (l *CustomListener) Accept() (net.Conn, error) { + conn, ok := <-l.acceptCh if !ok { return nil, fmt.Errorf("listener closed") } - conn.AddLogPrefix(l.GetPrefixStr()) return conn, nil } -func (l *CustomListener) PutConn(conn Conn) error { +func (l *CustomListener) PutConn(conn net.Conn) error { err := errors.PanicToError(func() { select { - case l.conns <- conn: + case l.acceptCh <- conn: default: conn.Close() } @@ -89,7 +58,7 @@ func (l *CustomListener) Close() error { l.mu.Lock() defer l.mu.Unlock() if !l.closed { - close(l.conns) + close(l.acceptCh) l.closed = true } return nil diff --git a/utils/net/tcp.go b/utils/net/tcp.go deleted file mode 100644 index 5c490d9..0000000 --- a/utils/net/tcp.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2016 fatedier, fatedier@gmail.com -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package net - -import ( - "fmt" - "net" - - "github.com/fatedier/frp/utils/log" -) - -type TcpListener struct { - net.Addr - listener net.Listener - accept chan Conn - closeFlag bool - log.Logger -} - -func ListenTcp(bindAddr string, bindPort int) (l *TcpListener, err error) { - tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", bindAddr, bindPort)) - if err != nil { - return l, err - } - listener, err := net.ListenTCP("tcp", tcpAddr) - if err != nil { - return l, err - } - - l = &TcpListener{ - Addr: listener.Addr(), - listener: listener, - accept: make(chan Conn), - closeFlag: false, - Logger: log.NewPrefixLogger(""), - } - - go func() { - for { - conn, err := listener.AcceptTCP() - if err != nil { - if l.closeFlag { - close(l.accept) - return - } - continue - } - - c := NewTcpConn(conn) - l.accept <- c - } - }() - return l, err -} - -// Wait util get one new connection or listener is closed -// if listener is closed, err returned. -func (l *TcpListener) Accept() (Conn, error) { - conn, ok := <-l.accept - if !ok { - return conn, fmt.Errorf("channel for tcp listener closed") - } - return conn, nil -} - -func (l *TcpListener) Close() error { - if !l.closeFlag { - l.closeFlag = true - l.listener.Close() - } - return nil -} - -// Wrap for TCPConn. -type TcpConn struct { - net.Conn - log.Logger -} - -func NewTcpConn(conn net.Conn) (c *TcpConn) { - c = &TcpConn{ - Conn: conn, - Logger: log.NewPrefixLogger(""), - } - return -} - -func ConnectTcpServer(addr string) (c Conn, err error) { - servertAddr, err := net.ResolveTCPAddr("tcp", addr) - if err != nil { - return - } - conn, err := net.DialTCP("tcp", nil, servertAddr) - if err != nil { - return - } - c = NewTcpConn(conn) - return -} diff --git a/utils/net/tls.go b/utils/net/tls.go index 4ac51d5..b9fca31 100644 --- a/utils/net/tls.go +++ b/utils/net/tls.go @@ -26,13 +26,13 @@ var ( FRP_TLS_HEAD_BYTE = 0x17 ) -func WrapTLSClientConn(c net.Conn, tlsConfig *tls.Config) (out Conn) { +func WrapTLSClientConn(c net.Conn, tlsConfig *tls.Config) (out net.Conn) { c.Write([]byte{byte(FRP_TLS_HEAD_BYTE)}) - out = WrapConn(tls.Client(c, tlsConfig)) + out = tls.Client(c, tlsConfig) return } -func CheckAndEnableTLSServerConnWithTimeout(c net.Conn, tlsConfig *tls.Config, timeout time.Duration) (out Conn, err error) { +func CheckAndEnableTLSServerConnWithTimeout(c net.Conn, tlsConfig *tls.Config, timeout time.Duration) (out net.Conn, err error) { sc, r := gnet.NewSharedConnSize(c, 2) buf := make([]byte, 1) var n int @@ -44,9 +44,9 @@ func CheckAndEnableTLSServerConnWithTimeout(c net.Conn, tlsConfig *tls.Config, t } if n == 1 && int(buf[0]) == FRP_TLS_HEAD_BYTE { - out = WrapConn(tls.Server(c, tlsConfig)) + out = tls.Server(c, tlsConfig) } else { - out = WrapConn(sc) + out = sc } return } diff --git a/utils/net/udp.go b/utils/net/udp.go index e748e43..28a6813 100644 --- a/utils/net/udp.go +++ b/utils/net/udp.go @@ -21,8 +21,6 @@ import ( "sync" "time" - "github.com/fatedier/frp/utils/log" - "github.com/fatedier/golib/pool" ) @@ -33,7 +31,6 @@ type UdpPacket struct { } type FakeUdpConn struct { - log.Logger l *UdpListener localAddr net.Addr @@ -47,7 +44,6 @@ type FakeUdpConn struct { func NewFakeUdpConn(l *UdpListener, laddr, raddr net.Addr) *FakeUdpConn { fc := &FakeUdpConn{ - Logger: log.NewPrefixLogger(""), l: l, localAddr: laddr, remoteAddr: raddr, @@ -157,15 +153,13 @@ func (c *FakeUdpConn) SetWriteDeadline(t time.Time) error { } type UdpListener struct { - net.Addr - accept chan Conn + addr net.Addr + acceptCh chan net.Conn writeCh chan *UdpPacket readConn net.Conn closeFlag bool fakeConns map[string]*FakeUdpConn - - log.Logger } func ListenUDP(bindAddr string, bindPort int) (l *UdpListener, err error) { @@ -176,11 +170,10 @@ func ListenUDP(bindAddr string, bindPort int) (l *UdpListener, err error) { readConn, err := net.ListenUDP("udp", udpAddr) l = &UdpListener{ - Addr: udpAddr, - accept: make(chan Conn), + addr: udpAddr, + acceptCh: make(chan net.Conn), writeCh: make(chan *UdpPacket, 1000), fakeConns: make(map[string]*FakeUdpConn), - Logger: log.NewPrefixLogger(""), } // for reading @@ -189,19 +182,19 @@ func ListenUDP(bindAddr string, bindPort int) (l *UdpListener, err error) { buf := pool.GetBuf(1450) n, remoteAddr, err := readConn.ReadFromUDP(buf) if err != nil { - close(l.accept) + close(l.acceptCh) close(l.writeCh) return } fakeConn, exist := l.fakeConns[remoteAddr.String()] if !exist || fakeConn.IsClosed() { - fakeConn = NewFakeUdpConn(l, l.Addr, remoteAddr) + fakeConn = NewFakeUdpConn(l, l.Addr(), remoteAddr) l.fakeConns[remoteAddr.String()] = fakeConn } fakeConn.putPacket(buf[:n]) - l.accept <- fakeConn + l.acceptCh <- fakeConn } }() @@ -226,7 +219,6 @@ func (l *UdpListener) writeUdpPacket(packet *UdpPacket) (err error) { defer func() { if errRet := recover(); errRet != nil { err = fmt.Errorf("udp write closed listener") - l.Info("udp write closed listener") } }() l.writeCh <- packet @@ -243,8 +235,8 @@ func (l *UdpListener) WriteMsg(buf []byte, remoteAddr *net.UDPAddr) (err error) return } -func (l *UdpListener) Accept() (Conn, error) { - conn, ok := <-l.accept +func (l *UdpListener) Accept() (net.Conn, error) { + conn, ok := <-l.acceptCh if !ok { return conn, fmt.Errorf("channel for udp listener closed") } @@ -258,3 +250,7 @@ func (l *UdpListener) Close() error { } return nil } + +func (l *UdpListener) Addr() net.Addr { + return l.addr +} diff --git a/utils/net/websocket.go b/utils/net/websocket.go index 9942337..36b6440 100644 --- a/utils/net/websocket.go +++ b/utils/net/websocket.go @@ -8,8 +8,6 @@ import ( "net/url" "time" - "github.com/fatedier/frp/utils/log" - "golang.org/x/net/websocket" ) @@ -22,10 +20,8 @@ const ( ) type WebsocketListener struct { - net.Addr - ln net.Listener - accept chan Conn - log.Logger + ln net.Listener + acceptCh chan net.Conn server *http.Server httpMutex *http.ServeMux @@ -35,9 +31,7 @@ type WebsocketListener struct { // ln: tcp listener for websocket connections func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) { wl = &WebsocketListener{ - Addr: ln.Addr(), - accept: make(chan Conn), - Logger: log.NewPrefixLogger(""), + acceptCh: make(chan net.Conn), } muxer := http.NewServeMux() @@ -46,7 +40,7 @@ func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) { conn := WrapCloseNotifyConn(c, func() { close(notifyCh) }) - wl.accept <- conn + wl.acceptCh <- conn <-notifyCh })) @@ -68,8 +62,8 @@ func ListenWebsocket(bindAddr string, bindPort int) (*WebsocketListener, error) return l, nil } -func (p *WebsocketListener) Accept() (Conn, error) { - c, ok := <-p.accept +func (p *WebsocketListener) Accept() (net.Conn, error) { + c, ok := <-p.acceptCh if !ok { return nil, ErrWebsocketListenerClosed } @@ -80,8 +74,12 @@ func (p *WebsocketListener) Close() error { return p.server.Close() } +func (p *WebsocketListener) Addr() net.Addr { + return p.ln.Addr() +} + // addr: domain:port -func ConnectWebsocketServer(addr string) (Conn, error) { +func ConnectWebsocketServer(addr string) (net.Conn, error) { addr = "ws://" + addr + FrpWebsocketPath uri, err := url.Parse(addr) if err != nil { @@ -101,6 +99,5 @@ func ConnectWebsocketServer(addr string) (Conn, error) { if err != nil { return nil, err } - c := WrapConn(conn) - return c, nil + return conn, nil } diff --git a/utils/vhost/https.go b/utils/vhost/https.go index 12fc8d0..5317701 100644 --- a/utils/vhost/https.go +++ b/utils/vhost/https.go @@ -17,11 +17,10 @@ package vhost import ( "fmt" "io" + "net" "strings" "time" - frpNet "github.com/fatedier/frp/utils/net" - gnet "github.com/fatedier/golib/net" "github.com/fatedier/golib/pool" ) @@ -48,7 +47,7 @@ type HttpsMuxer struct { *VhostMuxer } -func NewHttpsMuxer(listener frpNet.Listener, timeout time.Duration) (*HttpsMuxer, error) { +func NewHttpsMuxer(listener net.Listener, timeout time.Duration) (*HttpsMuxer, error) { mux, err := NewVhostMuxer(listener, GetHttpsHostname, nil, nil, timeout) return &HttpsMuxer{mux}, err } @@ -182,7 +181,7 @@ func readHandshake(rd io.Reader) (host string, err error) { return } -func GetHttpsHostname(c frpNet.Conn) (_ frpNet.Conn, _ map[string]string, err error) { +func GetHttpsHostname(c net.Conn) (_ net.Conn, _ map[string]string, err error) { reqInfoMap := make(map[string]string, 0) sc, rd := gnet.NewSharedConn(c) host, err := readHandshake(rd) @@ -191,5 +190,5 @@ func GetHttpsHostname(c frpNet.Conn) (_ frpNet.Conn, _ map[string]string, err er } reqInfoMap["Host"] = host reqInfoMap["Scheme"] = "https" - return frpNet.WrapConn(sc), reqInfoMap, nil + return sc, reqInfoMap, nil } diff --git a/utils/vhost/vhost.go b/utils/vhost/vhost.go index d3e54fa..57f8239 100644 --- a/utils/vhost/vhost.go +++ b/utils/vhost/vhost.go @@ -13,22 +13,25 @@ package vhost import ( + "context" "fmt" + "net" "strings" "time" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/xlog" "github.com/fatedier/golib/errors" ) -type muxFunc func(frpNet.Conn) (frpNet.Conn, map[string]string, error) -type httpAuthFunc func(frpNet.Conn, string, string, string) (bool, error) -type hostRewriteFunc func(frpNet.Conn, string) (frpNet.Conn, error) +type muxFunc func(net.Conn) (net.Conn, map[string]string, error) +type httpAuthFunc func(net.Conn, string, string, string) (bool, error) +type hostRewriteFunc func(net.Conn, string) (net.Conn, error) type VhostMuxer struct { - listener frpNet.Listener + listener net.Listener timeout time.Duration vhostFunc muxFunc authFunc httpAuthFunc @@ -36,7 +39,7 @@ type VhostMuxer struct { registryRouter *VhostRouters } -func NewVhostMuxer(listener frpNet.Listener, vhostFunc muxFunc, authFunc httpAuthFunc, rewriteFunc hostRewriteFunc, timeout time.Duration) (mux *VhostMuxer, err error) { +func NewVhostMuxer(listener net.Listener, vhostFunc muxFunc, authFunc httpAuthFunc, rewriteFunc hostRewriteFunc, timeout time.Duration) (mux *VhostMuxer, err error) { mux = &VhostMuxer{ listener: listener, timeout: timeout, @@ -49,7 +52,7 @@ func NewVhostMuxer(listener frpNet.Listener, vhostFunc muxFunc, authFunc httpAut return mux, nil } -type CreateConnFunc func(remoteAddr string) (frpNet.Conn, error) +type CreateConnFunc func(remoteAddr string) (net.Conn, error) // VhostRouteConfig is the params used to match HTTP requests type VhostRouteConfig struct { @@ -65,7 +68,7 @@ type VhostRouteConfig struct { // listen for a new domain name, if rewriteHost is not empty and rewriteFunc is not nil // then rewrite the host header to rewriteHost -func (v *VhostMuxer) Listen(cfg *VhostRouteConfig) (l *Listener, err error) { +func (v *VhostMuxer) Listen(ctx context.Context, cfg *VhostRouteConfig) (l *Listener, err error) { l = &Listener{ name: cfg.Domain, location: cfg.Location, @@ -73,8 +76,8 @@ func (v *VhostMuxer) Listen(cfg *VhostRouteConfig) (l *Listener, err error) { userName: cfg.Username, passWord: cfg.Password, mux: v, - accept: make(chan frpNet.Conn), - Logger: log.NewPrefixLogger(""), + accept: make(chan net.Conn), + ctx: ctx, } err = v.registryRouter.Add(cfg.Domain, cfg.Location, l) if err != nil { @@ -123,7 +126,7 @@ func (v *VhostMuxer) run() { } } -func (v *VhostMuxer) handle(c frpNet.Conn) { +func (v *VhostMuxer) handle(c net.Conn) { if err := c.SetDeadline(time.Now().Add(v.timeout)); err != nil { c.Close() return @@ -146,13 +149,14 @@ func (v *VhostMuxer) handle(c frpNet.Conn) { c.Close() return } + xl := xlog.FromContextSafe(l.ctx) // if authFunc is exist and userName/password is set // then verify user access if l.mux.authFunc != nil && l.userName != "" && l.passWord != "" { bAccess, err := l.mux.authFunc(c, l.userName, l.passWord, reqInfoMap["Authorization"]) if bAccess == false || err != nil { - l.Debug("check http Authorization failed") + xl.Debug("check http Authorization failed") res := noAuthResponse() res.Write(c) c.Close() @@ -166,12 +170,12 @@ func (v *VhostMuxer) handle(c frpNet.Conn) { } c = sConn - l.Debug("get new http request host [%s] path [%s]", name, path) + xl.Debug("get new http request host [%s] path [%s]", name, path) err = errors.PanicToError(func() { l.accept <- c }) if err != nil { - l.Warn("listener is already closed, ignore this request") + xl.Warn("listener is already closed, ignore this request") } } @@ -182,11 +186,12 @@ type Listener struct { userName string passWord string mux *VhostMuxer // for closing VhostMuxer - accept chan frpNet.Conn - log.Logger + accept chan net.Conn + ctx context.Context } -func (l *Listener) Accept() (frpNet.Conn, error) { +func (l *Listener) Accept() (net.Conn, error) { + xl := xlog.FromContextSafe(l.ctx) conn, ok := <-l.accept if !ok { return nil, fmt.Errorf("Listener closed") @@ -198,17 +203,13 @@ func (l *Listener) Accept() (frpNet.Conn, error) { if l.mux.rewriteFunc != nil { sConn, err := l.mux.rewriteFunc(conn, l.rewriteHost) if err != nil { - l.Warn("host header rewrite failed: %v", err) + xl.Warn("host header rewrite failed: %v", err) return nil, fmt.Errorf("host header rewrite failed") } - l.Debug("rewrite host to [%s] success", l.rewriteHost) + xl.Debug("rewrite host to [%s] success", l.rewriteHost) conn = sConn } - - for _, prefix := range l.GetAllPrefix() { - conn.AddLogPrefix(prefix) - } - return conn, nil + return frpNet.NewContextConn(conn, l.ctx), nil } func (l *Listener) Close() error { @@ -220,3 +221,7 @@ func (l *Listener) Close() error { func (l *Listener) Name() string { return l.name } + +func (l *Listener) Addr() net.Addr { + return (*net.TCPAddr)(nil) +} diff --git a/utils/xlog/ctx.go b/utils/xlog/ctx.go new file mode 100644 index 0000000..1d3619b --- /dev/null +++ b/utils/xlog/ctx.go @@ -0,0 +1,42 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xlog + +import ( + "context" +) + +type key int + +const ( + xlogKey key = 0 +) + +func NewContext(ctx context.Context, xl *Logger) context.Context { + return context.WithValue(ctx, xlogKey, xl) +} + +func FromContext(ctx context.Context) (xl *Logger, ok bool) { + xl, ok = ctx.Value(xlogKey).(*Logger) + return +} + +func FromContextSafe(ctx context.Context) *Logger { + xl, ok := ctx.Value(xlogKey).(*Logger) + if !ok { + xl = New() + } + return xl +} diff --git a/utils/xlog/xlog.go b/utils/xlog/xlog.go new file mode 100644 index 0000000..a1c4867 --- /dev/null +++ b/utils/xlog/xlog.go @@ -0,0 +1,73 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xlog + +import ( + "github.com/fatedier/frp/utils/log" +) + +// Logger is not thread safety for operations on prefix +type Logger struct { + prefixes []string + + prefixString string +} + +func New() *Logger { + return &Logger{ + prefixes: make([]string, 0), + } +} + +func (l *Logger) ResetPrefixes() (old []string) { + old = l.prefixes + l.prefixes = make([]string, 0) + l.prefixString = "" + return +} + +func (l *Logger) AppendPrefix(prefix string) *Logger { + l.prefixes = append(l.prefixes, prefix) + l.prefixString += "[" + prefix + "] " + return l +} + +func (l *Logger) Spawn() *Logger { + nl := New() + for _, v := range l.prefixes { + nl.AppendPrefix(v) + } + return nl +} + +func (l *Logger) Error(format string, v ...interface{}) { + log.Log.Error(l.prefixString+format, v...) +} + +func (l *Logger) Warn(format string, v ...interface{}) { + log.Log.Warn(l.prefixString+format, v...) +} + +func (l *Logger) Info(format string, v ...interface{}) { + log.Log.Info(l.prefixString+format, v...) +} + +func (l *Logger) Debug(format string, v ...interface{}) { + log.Log.Debug(l.prefixString+format, v...) +} + +func (l *Logger) Trace(format string, v ...interface{}) { + log.Log.Trace(l.prefixString+format, v...) +}