optimize some code (#3801)

This commit is contained in:
fatedier 2023-11-27 15:47:49 +08:00 committed by GitHub
parent d5b41f1e14
commit 69ae2b0b69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 880 additions and 600 deletions

View File

@ -1,6 +1,9 @@
### Features
* New command line parameter `--strict_config` is added to enable strict configuration validation mode. It will throw an error for non-existent fields instead of ignoring them.
* New command line parameter `--strict_config` is added to enable strict configuration validation mode. It will throw an error for non-existent fields instead of ignoring them. In future versions, we may set the default value of this parameter to true.
* Support `SSH reverse tunneling`. With this feature, you can expose your local service without running frpc, only using SSH. The SSH reverse tunnel agent has many functional limitations compared to the frpc agent. The currently supported proxy types are tcp, http, https, tcpmux, and stcp.
* The frpc tcpmux command line parameters have been updated to support configuring `http_user` and `http_pwd`.
* The frpc stcp/sudp/xtcp command line parameters have been updated to support configuring `allow_users`.
### Fixes

View File

@ -1,85 +0,0 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"net"
"net/http"
"net/http/pprof"
"time"
"github.com/gorilla/mux"
"github.com/fatedier/frp/assets"
utilnet "github.com/fatedier/frp/pkg/util/net"
)
var (
httpServerReadTimeout = 60 * time.Second
httpServerWriteTimeout = 60 * time.Second
)
func (svr *Service) RunAdminServer(address string) (err error) {
// url router
router := mux.NewRouter()
router.HandleFunc("/healthz", svr.healthz)
// debug
if svr.cfg.WebServer.PprofEnable {
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
router.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
}
subRouter := router.NewRoute().Subrouter()
user, passwd := svr.cfg.WebServer.User, svr.cfg.WebServer.Password
subRouter.Use(utilnet.NewHTTPAuthMiddleware(user, passwd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
// api, see admin_api.go
subRouter.HandleFunc("/api/reload", svr.apiReload).Methods("GET")
subRouter.HandleFunc("/api/stop", svr.apiStop).Methods("POST")
subRouter.HandleFunc("/api/status", svr.apiStatus).Methods("GET")
subRouter.HandleFunc("/api/config", svr.apiGetConfig).Methods("GET")
subRouter.HandleFunc("/api/config", svr.apiPutConfig).Methods("PUT")
// view
subRouter.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET")
subRouter.PathPrefix("/static/").Handler(utilnet.MakeHTTPGzipHandler(http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))).Methods("GET")
subRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/static/", http.StatusMovedPermanently)
})
server := &http.Server{
Addr: address,
Handler: router,
ReadTimeout: httpServerReadTimeout,
WriteTimeout: httpServerWriteTimeout,
}
if address == "" {
address = ":http"
}
ln, err := net.Listen("tcp", address)
if err != nil {
return err
}
go func() {
_ = server.Serve(ln)
}()
return
}

View File

@ -31,7 +31,9 @@ import (
"github.com/fatedier/frp/client/proxy"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/config/v1/validation"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/pkg/util/log"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
type GeneralResponse struct {
@ -39,6 +41,29 @@ type GeneralResponse struct {
Msg string
}
func (svr *Service) registerRouteHandlers(helper *httppkg.RouterRegisterHelper) {
helper.Router.HandleFunc("/healthz", svr.healthz)
subRouter := helper.Router.NewRoute().Subrouter()
subRouter.Use(helper.AuthMiddleware.Middleware)
// api, see admin_api.go
subRouter.HandleFunc("/api/reload", svr.apiReload).Methods("GET")
subRouter.HandleFunc("/api/stop", svr.apiStop).Methods("POST")
subRouter.HandleFunc("/api/status", svr.apiStatus).Methods("GET")
subRouter.HandleFunc("/api/config", svr.apiGetConfig).Methods("GET")
subRouter.HandleFunc("/api/config", svr.apiPutConfig).Methods("PUT")
// view
subRouter.Handle("/favicon.ico", http.FileServer(helper.AssetsFS)).Methods("GET")
subRouter.PathPrefix("/static/").Handler(
netpkg.MakeHTTPGzipHandler(http.StripPrefix("/static/", http.FileServer(helper.AssetsFS))),
).Methods("GET")
subRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/static/", http.StatusMovedPermanently)
})
}
// /healthz
func (svr *Service) healthz(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
@ -62,21 +87,21 @@ func (svr *Service) apiReload(w http.ResponseWriter, r *http.Request) {
}
}()
cliCfg, pxyCfgs, visitorCfgs, _, err := config.LoadClientConfig(svr.cfgFile, strictConfigMode)
cliCfg, proxyCfgs, visitorCfgs, _, err := config.LoadClientConfig(svr.configFilePath, strictConfigMode)
if err != nil {
res.Code = 400
res.Msg = err.Error()
log.Warn("reload frpc proxy config error: %s", res.Msg)
return
}
if _, err := validation.ValidateAllClientConfig(cliCfg, pxyCfgs, visitorCfgs); err != nil {
if _, err := validation.ValidateAllClientConfig(cliCfg, proxyCfgs, visitorCfgs); err != nil {
res.Code = 400
res.Msg = err.Error()
log.Warn("reload frpc proxy config error: %s", res.Msg)
return
}
if err := svr.ReloadConf(pxyCfgs, visitorCfgs); err != nil {
if err := svr.UpdateAllConfigurer(proxyCfgs, visitorCfgs); err != nil {
res.Code = 500
res.Msg = err.Error()
log.Warn("reload frpc proxy config error: %s", res.Msg)
@ -158,7 +183,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, _ *http.Request) {
ps := ctl.pm.GetAllProxyStatus()
for _, status := range ps {
res[status.Type] = append(res[status.Type], NewProxyStatusResp(status, svr.cfg.ServerAddr))
res[status.Type] = append(res[status.Type], NewProxyStatusResp(status, svr.common.ServerAddr))
}
for _, arrs := range res {
@ -184,14 +209,14 @@ func (svr *Service) apiGetConfig(w http.ResponseWriter, _ *http.Request) {
}
}()
if svr.cfgFile == "" {
if svr.configFilePath == "" {
res.Code = 400
res.Msg = "frpc has no config file path"
log.Warn("%s", res.Msg)
return
}
content, err := os.ReadFile(svr.cfgFile)
content, err := os.ReadFile(svr.configFilePath)
if err != nil {
res.Code = 400
res.Msg = err.Error()
@ -230,7 +255,7 @@ func (svr *Service) apiPutConfig(w http.ResponseWriter, r *http.Request) {
return
}
if err := os.WriteFile(svr.cfgFile, body, 0o644); err != nil {
if err := os.WriteFile(svr.configFilePath, body, 0o644); err != nil {
res.Code = 500
res.Msg = fmt.Sprintf("write content to frpc config file error: %v", err)
log.Warn("%s", res.Msg)

View File

@ -21,6 +21,7 @@ import (
"net"
"strconv"
"strings"
"sync"
"time"
libdial "github.com/fatedier/golib/net/dial"
@ -30,7 +31,7 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
)
@ -48,6 +49,7 @@ type defaultConnectorImpl struct {
muxSession *fmux.Session
quicConn quic.Connection
closeOnce sync.Once
}
func NewConnector(ctx context.Context, cfg *v1.ClientCommonConfig) Connector {
@ -130,7 +132,7 @@ func (c *defaultConnectorImpl) Connect() (net.Conn, error) {
if err != nil {
return nil, err
}
return utilnet.QuicStreamToNetConn(stream, c.quicConn), nil
return netpkg.QuicStreamToNetConn(stream, c.quicConn), nil
} else if c.muxSession != nil {
stream, err := c.muxSession.OpenStream()
if err != nil {
@ -177,19 +179,19 @@ func (c *defaultConnectorImpl) realConnect() (net.Conn, error) {
switch protocol {
case "websocket":
protocol = "tcp"
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: utilnet.DialHookWebsocket(protocol, "")}))
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: netpkg.DialHookWebsocket(protocol, "")}))
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(c.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
Hook: netpkg.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(c.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
}))
dialOptions = append(dialOptions, libdial.WithTLSConfig(tlsConfig))
case "wss":
protocol = "tcp"
dialOptions = append(dialOptions, libdial.WithTLSConfigAndPriority(100, tlsConfig))
// Make sure that if it is wss, the websocket hook is executed after the tls hook.
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: utilnet.DialHookWebsocket(protocol, tlsConfig.ServerName), Priority: 110}))
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: netpkg.DialHookWebsocket(protocol, tlsConfig.ServerName), Priority: 110}))
default:
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(c.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
Hook: netpkg.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(c.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
}))
dialOptions = append(dialOptions, libdial.WithTLSConfig(tlsConfig))
}
@ -213,11 +215,13 @@ func (c *defaultConnectorImpl) realConnect() (net.Conn, error) {
}
func (c *defaultConnectorImpl) Close() error {
c.closeOnce.Do(func() {
if c.quicConn != nil {
_ = c.quicConn.CloseWithError(0, "")
}
if c.muxSession != nil {
_ = c.muxSession.Close()
}
})
return nil
}

View File

@ -28,39 +28,42 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/wait"
"github.com/fatedier/frp/pkg/util/xlog"
)
type SessionContext struct {
// The client common configuration.
Common *v1.ClientCommonConfig
// Unique ID obtained from frps.
// It should be attached to the login message when reconnecting.
RunID string
// Underlying control connection. Once conn is closed, the msgDispatcher and the entire Control will exit.
Conn net.Conn
// Indicates whether the connection is encrypted.
ConnEncrypted bool
// Sets authentication based on selected method
AuthSetter auth.Setter
// Connector is used to create new connections, which could be real TCP connections or virtual streams.
Connector Connector
}
type Control struct {
// service context
ctx context.Context
xl *xlog.Logger
// The client configuration
clientCfg *v1.ClientCommonConfig
// sets authentication based on selected method
authSetter auth.Setter
// Unique ID obtained from frps.
// It should be attached to the login message when reconnecting.
runID string
// session context
sessionCtx *SessionContext
// manage all proxies
pxyCfgs []v1.ProxyConfigurer
pm *proxy.Manager
// manage all visitors
vm *visitor.Manager
// control connection. Once conn is closed, the msgDispatcher and the entire Control will exit.
conn net.Conn
// use connector to create new connections, which could be real TCP connections or virtual streams.
connector Connector
doneCh chan struct{}
// of time.Time, last time got the Pong message
@ -76,50 +79,41 @@ type Control struct {
msgDispatcher *msg.Dispatcher
}
func NewControl(
ctx context.Context, runID string, conn net.Conn, connector Connector,
clientCfg *v1.ClientCommonConfig,
pxyCfgs []v1.ProxyConfigurer,
visitorCfgs []v1.VisitorConfigurer,
authSetter auth.Setter,
) (*Control, error) {
func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, error) {
// new xlog instance
ctl := &Control{
ctx: ctx,
xl: xlog.FromContextSafe(ctx),
clientCfg: clientCfg,
authSetter: authSetter,
runID: runID,
pxyCfgs: pxyCfgs,
conn: conn,
connector: connector,
sessionCtx: sessionCtx,
doneCh: make(chan struct{}),
}
ctl.lastPong.Store(time.Now())
cryptoRW, err := utilnet.NewCryptoReadWriter(conn, []byte(clientCfg.Auth.Token))
if sessionCtx.ConnEncrypted {
cryptoRW, err := netpkg.NewCryptoReadWriter(sessionCtx.Conn, []byte(sessionCtx.Common.Auth.Token))
if err != nil {
return nil, err
}
ctl.msgDispatcher = msg.NewDispatcher(cryptoRW)
} else {
ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn)
}
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
ctl.pm = proxy.NewManager(ctl.ctx, clientCfg, ctl.msgTransporter)
ctl.vm = visitor.NewManager(ctl.ctx, ctl.runID, ctl.clientCfg, ctl.connectServer, ctl.msgTransporter)
ctl.vm.Reload(visitorCfgs)
ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter)
ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter)
return ctl, nil
}
func (ctl *Control) Run() {
func (ctl *Control) Run(proxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) {
go ctl.worker()
// start all proxies
ctl.pm.Reload(ctl.pxyCfgs)
ctl.pm.UpdateAll(proxyCfgs)
// start all visitors
go ctl.vm.Run()
ctl.vm.UpdateAll(visitorCfgs)
}
func (ctl *Control) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
@ -135,9 +129,9 @@ func (ctl *Control) handleReqWorkConn(_ msg.Message) {
}
m := &msg.NewWorkConn{
RunID: ctl.runID,
RunID: ctl.sessionCtx.RunID,
}
if err = ctl.authSetter.SetNewWorkConn(m); err != nil {
if err = ctl.sessionCtx.AuthSetter.SetNewWorkConn(m); err != nil {
xl.Warn("error during NewWorkConn authentication: %v", err)
return
}
@ -193,13 +187,19 @@ func (ctl *Control) handlePong(m msg.Message) {
if inMsg.Error != "" {
xl.Error("Pong message contains error: %s", inMsg.Error)
ctl.conn.Close()
ctl.closeSession()
return
}
ctl.lastPong.Store(time.Now())
xl.Debug("receive heartbeat from server")
}
// closeSession closes the control connection.
func (ctl *Control) closeSession() {
ctl.sessionCtx.Conn.Close()
ctl.sessionCtx.Connector.Close()
}
func (ctl *Control) Close() error {
return ctl.GracefulClose(0)
}
@ -210,8 +210,7 @@ func (ctl *Control) GracefulClose(d time.Duration) error {
time.Sleep(d)
ctl.conn.Close()
ctl.connector.Close()
ctl.closeSession()
return nil
}
@ -221,8 +220,8 @@ func (ctl *Control) Done() <-chan struct{} {
}
// connectServer return a new connection to frps
func (ctl *Control) connectServer() (conn net.Conn, err error) {
return ctl.connector.Connect()
func (ctl *Control) connectServer() (net.Conn, error) {
return ctl.sessionCtx.Connector.Connect()
}
func (ctl *Control) registerMsgHandlers() {
@ -238,12 +237,12 @@ func (ctl *Control) heartbeatWorker() {
// TODO(fatedier): Change default value of HeartbeatInterval to -1 if tcpmux is enabled.
// Users can still enable heartbeat feature by setting HeartbeatInterval to a positive value.
if ctl.clientCfg.Transport.HeartbeatInterval > 0 {
if ctl.sessionCtx.Common.Transport.HeartbeatInterval > 0 {
// send heartbeat to server
sendHeartBeat := func() error {
xl.Debug("send heartbeat to server")
pingMsg := &msg.Ping{}
if err := ctl.authSetter.SetPing(pingMsg); err != nil {
if err := ctl.sessionCtx.AuthSetter.SetPing(pingMsg); err != nil {
xl.Warn("error during ping authentication: %v, skip sending ping message", err)
return err
}
@ -253,24 +252,24 @@ func (ctl *Control) heartbeatWorker() {
go wait.BackoffUntil(sendHeartBeat,
wait.NewFastBackoffManager(wait.FastBackoffOptions{
Duration: time.Duration(ctl.clientCfg.Transport.HeartbeatInterval) * time.Second,
Duration: time.Duration(ctl.sessionCtx.Common.Transport.HeartbeatInterval) * time.Second,
InitDurationIfFail: time.Second,
Factor: 2.0,
Jitter: 0.1,
MaxDuration: time.Duration(ctl.clientCfg.Transport.HeartbeatInterval) * time.Second,
MaxDuration: time.Duration(ctl.sessionCtx.Common.Transport.HeartbeatInterval) * time.Second,
}),
true, ctl.doneCh,
)
}
// Check heartbeat timeout only if TCPMux is not enabled and users don't disable heartbeat feature.
if ctl.clientCfg.Transport.HeartbeatInterval > 0 && ctl.clientCfg.Transport.HeartbeatTimeout > 0 &&
!lo.FromPtr(ctl.clientCfg.Transport.TCPMux) {
if ctl.sessionCtx.Common.Transport.HeartbeatInterval > 0 && ctl.sessionCtx.Common.Transport.HeartbeatTimeout > 0 &&
!lo.FromPtr(ctl.sessionCtx.Common.Transport.TCPMux) {
go wait.Until(func() {
if time.Since(ctl.lastPong.Load().(time.Time)) > time.Duration(ctl.clientCfg.Transport.HeartbeatTimeout)*time.Second {
if time.Since(ctl.lastPong.Load().(time.Time)) > time.Duration(ctl.sessionCtx.Common.Transport.HeartbeatTimeout)*time.Second {
xl.Warn("heartbeat timeout")
ctl.conn.Close()
ctl.closeSession()
return
}
}, time.Second, ctl.doneCh)
@ -282,17 +281,15 @@ func (ctl *Control) worker() {
go ctl.msgDispatcher.Run()
<-ctl.msgDispatcher.Done()
ctl.conn.Close()
ctl.closeSession()
ctl.pm.Close()
ctl.vm.Close()
ctl.connector.Close()
close(ctl.doneCh)
}
func (ctl *Control) ReloadConf(pxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error {
ctl.vm.Reload(visitorCfgs)
ctl.pm.Reload(pxyCfgs)
func (ctl *Control) UpdateAllConfigurer(proxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error {
ctl.vm.UpdateAll(visitorCfgs)
ctl.pm.UpdateAll(proxyCfgs)
return nil
}

View File

@ -120,9 +120,18 @@ func (pm *Manager) GetAllProxyStatus() []*WorkingStatus {
return ps
}
func (pm *Manager) Reload(pxyCfgs []v1.ProxyConfigurer) {
func (pm *Manager) GetProxyStatus(name string) (*WorkingStatus, bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
if pxy, ok := pm.proxies[name]; ok {
return pxy.GetStatus(), true
}
return nil, false
}
func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) {
xl := xlog.FromContextSafe(pm.ctx)
pxyCfgsMap := lo.KeyBy(pxyCfgs, func(c v1.ProxyConfigurer) string {
proxyCfgsMap := lo.KeyBy(proxyCfgs, func(c v1.ProxyConfigurer) string {
return c.GetBaseConfig().Name
})
pm.mu.Lock()
@ -131,7 +140,7 @@ func (pm *Manager) Reload(pxyCfgs []v1.ProxyConfigurer) {
delPxyNames := make([]string, 0)
for name, pxy := range pm.proxies {
del := false
cfg, ok := pxyCfgsMap[name]
cfg, ok := proxyCfgsMap[name]
if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {
del = true
}
@ -147,7 +156,7 @@ func (pm *Manager) Reload(pxyCfgs []v1.ProxyConfigurer) {
}
addPxyNames := make([]string, 0)
for _, cfg := range pxyCfgs {
for _, cfg := range proxyCfgs {
name := cfg.GetBaseConfig().Name
if _, ok := pm.proxies[name]; !ok {
pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter)

View File

@ -31,7 +31,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
func init() {
@ -101,7 +101,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
if pxy.cfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
conn = utilnet.WrapReadWriteCloserToConn(rwc, conn)
conn = netpkg.WrapReadWriteCloserToConn(rwc, conn)
workConn := conn
readCh := make(chan *msg.UDPPacket, 1024)

View File

@ -30,7 +30,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
func init() {
@ -112,7 +112,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
if pxy.cfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
conn = utilnet.WrapReadWriteCloserToConn(rwc, conn)
conn = netpkg.WrapReadWriteCloserToConn(rwc, conn)
pxy.mu.Lock()
pxy.workConn = conn

View File

@ -29,7 +29,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/nathole"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
func init() {
@ -133,7 +133,7 @@ func (pxy *XTCPProxy) listenByKCP(listenConn *net.UDPConn, raddr *net.UDPAddr, s
}
defer lConn.Close()
remote, err := utilnet.NewKCPConnFromUDP(lConn, true, raddr.String())
remote, err := netpkg.NewKCPConnFromUDP(lConn, true, raddr.String())
if err != nil {
xl.Warn("create kcp connection from udp connection error: %v", err)
return
@ -194,6 +194,6 @@ func (pxy *XTCPProxy) listenByQUIC(listenConn *net.UDPConn, _ *net.UDPAddr, star
_ = c.CloseWithError(0, "")
return
}
go pxy.HandleTCPWorkConnection(utilnet.QuicStreamToNetConn(stream, c), startWorkConnMsg, []byte(pxy.cfg.Secretkey))
go pxy.HandleTCPWorkConnection(netpkg.QuicStreamToNetConn(stream, c), startWorkConnMsg, []byte(pxy.cfg.Secretkey))
}
}

View File

@ -20,18 +20,19 @@ import (
"fmt"
"net"
"runtime"
"strconv"
"sync"
"time"
"github.com/fatedier/golib/crypto"
"github.com/samber/lo"
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/client/proxy"
"github.com/fatedier/frp/pkg/auth"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/pkg/util/log"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/wait"
"github.com/fatedier/frp/pkg/util/xlog"
@ -41,66 +42,106 @@ func init() {
crypto.DefaultSalt = "frp"
}
// Service is a client service.
type Service struct {
// uniq id got from frps, attach it in loginMsg
runID string
// ServiceOptions contains options for creating a new client service.
type ServiceOptions struct {
Common *v1.ClientCommonConfig
ProxyCfgs []v1.ProxyConfigurer
VisitorCfgs []v1.VisitorConfigurer
// ConfigFilePath is the path to the configuration file used to initialize.
// If it is empty, it means that the configuration file is not used for initialization.
// It may be initialized using command line parameters or called directly.
ConfigFilePath string
// ClientSpec is the client specification that control the client behavior.
ClientSpec *msg.ClientSpec
// ConnectorCreator is a function that creates a new connector to make connections to the server.
// The Connector shields the underlying connection details, whether it is through TCP or QUIC connection,
// and regardless of whether multiplexing is used.
//
// If it is not set, the default frpc connector will be used.
// By using a custom Connector, it can be used to implement a VirtualClient, which connects to frps
// through a pipe instead of a real physical connection.
ConnectorCreator func(context.Context, *v1.ClientCommonConfig) Connector
// HandleWorkConnCb is a callback function that is called when a new work connection is created.
//
// If it is not set, the default frpc implementation will be used.
HandleWorkConnCb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool
}
// setServiceOptionsDefault sets the default values for ServiceOptions.
func setServiceOptionsDefault(options *ServiceOptions) {
if options.Common != nil {
options.Common.Complete()
}
if options.ConnectorCreator == nil {
options.ConnectorCreator = NewConnector
}
}
// Service is the client service that connects to frps and provides proxy services.
type Service struct {
ctlMu sync.RWMutex
// manager control connection with server
ctl *Control
ctlMu sync.RWMutex
// Uniq id got from frps, it will be attached to loginMsg.
runID string
// Sets authentication based on selected method
authSetter auth.Setter
cfg *v1.ClientCommonConfig
pxyCfgs []v1.ProxyConfigurer
visitorCfgs []v1.VisitorConfigurer
// web server for admin UI and apis
webServer *httppkg.Server
cfgMu sync.RWMutex
common *v1.ClientCommonConfig
proxyCfgs []v1.ProxyConfigurer
visitorCfgs []v1.VisitorConfigurer
clientSpec *msg.ClientSpec
// The configuration file used to initialize this client, or an empty
// string if no configuration file was used.
cfgFile string
configFilePath string
// service context
ctx context.Context
// call cancel to stop service
cancel context.CancelFunc
gracefulDuration time.Duration
gracefulShutdownDuration time.Duration
connectorCreator func(context.Context, *v1.ClientCommonConfig) Connector
inWorkConnCallback func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool
handleWorkConnCb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool
}
func NewService(
cfg *v1.ClientCommonConfig,
pxyCfgs []v1.ProxyConfigurer,
visitorCfgs []v1.VisitorConfigurer,
cfgFile string,
) *Service {
return &Service{
authSetter: auth.NewAuthSetter(cfg.Auth),
cfg: cfg,
cfgFile: cfgFile,
pxyCfgs: pxyCfgs,
visitorCfgs: visitorCfgs,
func NewService(options ServiceOptions) (*Service, error) {
setServiceOptionsDefault(&options)
var webServer *httppkg.Server
if options.Common.WebServer.Port > 0 {
ws, err := httppkg.NewServer(options.Common.WebServer)
if err != nil {
return nil, err
}
webServer = ws
}
s := &Service{
ctx: context.Background(),
connectorCreator: NewConnector,
authSetter: auth.NewAuthSetter(options.Common.Auth),
webServer: webServer,
common: options.Common,
configFilePath: options.ConfigFilePath,
proxyCfgs: options.ProxyCfgs,
visitorCfgs: options.VisitorCfgs,
clientSpec: options.ClientSpec,
connectorCreator: options.ConnectorCreator,
handleWorkConnCb: options.HandleWorkConnCb,
}
if webServer != nil {
webServer.RouteRegister(s.registerRouteHandlers)
}
func (svr *Service) SetConnectorCreator(h func(context.Context, *v1.ClientCommonConfig) Connector) {
svr.connectorCreator = h
}
func (svr *Service) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
svr.inWorkConnCallback = cb
}
func (svr *Service) GetController() *Control {
svr.ctlMu.RLock()
defer svr.ctlMu.RUnlock()
return svr.ctl
return s, nil
}
func (svr *Service) Run(ctx context.Context) error {
@ -109,38 +150,25 @@ func (svr *Service) Run(ctx context.Context) error {
svr.cancel = cancel
// set custom DNSServer
if svr.cfg.DNSServer != "" {
dnsAddr := svr.cfg.DNSServer
if _, _, err := net.SplitHostPort(dnsAddr); err != nil {
dnsAddr = net.JoinHostPort(dnsAddr, "53")
}
// Change default dns server for frpc
net.DefaultResolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return net.Dial("udp", dnsAddr)
},
}
if svr.common.DNSServer != "" {
netpkg.SetDefaultDNSAddress(svr.common.DNSServer)
}
// login to frps
svr.loopLoginUntilSuccess(10*time.Second, lo.FromPtr(svr.cfg.LoginFailExit))
// first login to frps
svr.loopLoginUntilSuccess(10*time.Second, lo.FromPtr(svr.common.LoginFailExit))
if svr.ctl == nil {
return fmt.Errorf("the process exited because the first login to the server failed, and the loginFailExit feature is enabled")
}
go svr.keepControllerWorking()
if svr.cfg.WebServer.Port != 0 {
// Init admin server assets
assets.Load(svr.cfg.WebServer.AssetsDir)
address := net.JoinHostPort(svr.cfg.WebServer.Addr, strconv.Itoa(svr.cfg.WebServer.Port))
err := svr.RunAdminServer(address)
if err != nil {
log.Warn("run admin server error: %v", err)
if svr.webServer != nil {
go func() {
log.Info("admin server listen on %s", svr.webServer.Address())
if err := svr.webServer.Run(); err != nil {
log.Warn("admin server exit with error: %v", err)
}
log.Info("admin server listen on %s:%d", svr.cfg.WebServer.Addr, svr.cfg.WebServer.Port)
}()
}
<-svr.ctx.Done()
svr.stop()
@ -158,8 +186,12 @@ func (svr *Service) keepControllerWorking() {
// loopLoginUntilSuccess is another layer of loop that will continuously attempt to
// login to the server until successful.
svr.loopLoginUntilSuccess(20*time.Second, false)
if svr.ctl != nil {
<-svr.ctl.Done()
return errors.New("control is closed and try another loop")
}
// If the control is nil, it means that the login failed and the service is also closed.
return nil
}, wait.NewFastBackoffManager(
wait.FastBackoffOptions{
Duration: time.Second,
@ -179,7 +211,7 @@ func (svr *Service) keepControllerWorking() {
// session: if it's not nil, using tcp mux
func (svr *Service) login() (conn net.Conn, connector Connector, err error) {
xl := xlog.FromContextSafe(svr.ctx)
connector = svr.connectorCreator(svr.ctx, svr.cfg)
connector = svr.connectorCreator(svr.ctx, svr.common)
if err = connector.Open(); err != nil {
return nil, nil, err
}
@ -198,12 +230,15 @@ func (svr *Service) login() (conn net.Conn, connector Connector, err error) {
loginMsg := &msg.Login{
Arch: runtime.GOARCH,
Os: runtime.GOOS,
PoolCount: svr.cfg.Transport.PoolCount,
User: svr.cfg.User,
PoolCount: svr.common.Transport.PoolCount,
User: svr.common.User,
Version: version.Full(),
Timestamp: time.Now().Unix(),
RunID: svr.runID,
Metas: svr.cfg.Metadatas,
Metas: svr.common.Metadatas,
}
if svr.clientSpec != nil {
loginMsg.ClientSpec = *svr.clientSpec
}
// Add auth
@ -250,16 +285,31 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
return err
}
ctl, err := NewControl(svr.ctx, svr.runID, conn, connector,
svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.authSetter)
svr.cfgMu.RLock()
proxyCfgs := svr.proxyCfgs
visitorCfgs := svr.visitorCfgs
svr.cfgMu.RUnlock()
connEncrypted := true
if svr.clientSpec != nil && svr.clientSpec.Type == "ssh-tunnel" {
connEncrypted = false
}
sessionCtx := &SessionContext{
Common: svr.common,
RunID: svr.runID,
Conn: conn,
ConnEncrypted: connEncrypted,
AuthSetter: svr.authSetter,
Connector: connector,
}
ctl, err := NewControl(svr.ctx, sessionCtx)
if err != nil {
conn.Close()
xl.Error("NewControl error: %v", err)
return err
}
ctl.SetInWorkConnCallback(svr.inWorkConnCallback)
ctl.SetInWorkConnCallback(svr.handleWorkConnCb)
ctl.Run()
ctl.Run(proxyCfgs, visitorCfgs)
// close and replace previous control
svr.ctlMu.Lock()
if svr.ctl != nil {
@ -284,9 +334,9 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
wait.MergeAndCloseOnAnyStopChannel(svr.ctx.Done(), successCh))
}
func (svr *Service) ReloadConf(pxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error {
func (svr *Service) UpdateAllConfigurer(proxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error {
svr.cfgMu.Lock()
svr.pxyCfgs = pxyCfgs
svr.proxyCfgs = proxyCfgs
svr.visitorCfgs = visitorCfgs
svr.cfgMu.Unlock()
@ -295,7 +345,7 @@ func (svr *Service) ReloadConf(pxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.Vi
svr.ctlMu.RUnlock()
if ctl != nil {
return svr.ctl.ReloadConf(pxyCfgs, visitorCfgs)
return svr.ctl.UpdateAllConfigurer(proxyCfgs, visitorCfgs)
}
return nil
}
@ -305,7 +355,7 @@ func (svr *Service) Close() {
}
func (svr *Service) GracefulClose(d time.Duration) {
svr.gracefulDuration = d
svr.gracefulShutdownDuration = d
svr.cancel()
}
@ -313,7 +363,23 @@ func (svr *Service) stop() {
svr.ctlMu.Lock()
defer svr.ctlMu.Unlock()
if svr.ctl != nil {
svr.ctl.GracefulClose(svr.gracefulDuration)
svr.ctl.GracefulClose(svr.gracefulShutdownDuration)
svr.ctl = nil
}
}
// TODO(fatedier): Use StatusExporter to provide query interfaces instead of directly using methods from the Service.
func (svr *Service) GetProxyStatus(name string) (*proxy.WorkingStatus, error) {
svr.ctlMu.RLock()
ctl := svr.ctl
svr.ctlMu.RUnlock()
if ctl == nil {
return nil, fmt.Errorf("control is not running")
}
ws, ok := ctl.pm.GetProxyStatus(name)
if !ok {
return nil, fmt.Errorf("proxy [%s] is not found", name)
}
return ws, nil
}

View File

@ -28,7 +28,7 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
)
@ -242,7 +242,7 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
if sv.cfg.Transport.UseCompression {
remote = libio.WithCompression(remote)
}
return utilnet.WrapReadWriteCloserToConn(remote, visitorConn), nil
return netpkg.WrapReadWriteCloserToConn(remote, visitorConn), nil
}
func (sv *SUDPVisitor) Close() {

View File

@ -21,7 +21,7 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
)
@ -56,7 +56,7 @@ func NewVisitor(
clientCfg: clientCfg,
helper: helper,
ctx: xlog.NewContext(ctx, xl),
internalLn: utilnet.NewInternalListener(),
internalLn: netpkg.NewInternalListener(),
}
switch cfg := cfg.(type) {
case *v1.STCPVisitorConfig:
@ -84,7 +84,7 @@ type BaseVisitor struct {
clientCfg *v1.ClientCommonConfig
helper Helper
l net.Listener
internalLn *utilnet.InternalListener
internalLn *netpkg.InternalListener
mu sync.RWMutex
ctx context.Context

View File

@ -36,6 +36,7 @@ type Manager struct {
helper Helper
checkInterval time.Duration
keepVisitorsRunningOnce sync.Once
mu sync.RWMutex
ctx context.Context
@ -67,7 +68,9 @@ func NewManager(
return m
}
func (vm *Manager) Run() {
// keepVisitorsRunning checks all visitors' status periodically, if some visitor is not running, start it.
// It will only start after Reload is called and a new visitor is added.
func (vm *Manager) keepVisitorsRunning() {
xl := xlog.FromContextSafe(vm.ctx)
ticker := time.NewTicker(vm.checkInterval)
@ -76,7 +79,7 @@ func (vm *Manager) Run() {
for {
select {
case <-vm.stopCh:
xl.Info("gracefully shutdown visitor manager")
xl.Trace("gracefully shutdown visitor manager")
return
case <-ticker.C:
vm.mu.Lock()
@ -120,7 +123,14 @@ func (vm *Manager) startVisitor(cfg v1.VisitorConfigurer) (err error) {
return
}
func (vm *Manager) Reload(cfgs []v1.VisitorConfigurer) {
func (vm *Manager) UpdateAll(cfgs []v1.VisitorConfigurer) {
if len(cfgs) > 0 {
// Only start keepVisitorsRunning goroutine once and only when there is at least one visitor.
vm.keepVisitorsRunningOnce.Do(func() {
go vm.keepVisitorsRunning()
})
}
xl := xlog.FromContextSafe(vm.ctx)
cfgsMap := lo.KeyBy(cfgs, func(c v1.VisitorConfigurer) string {
return c.GetBaseConfig().Name

View File

@ -33,7 +33,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/nathole"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
)
@ -349,7 +349,7 @@ func (ks *KCPTunnelSession) Init(listenConn *net.UDPConn, raddr *net.UDPAddr) er
if err != nil {
return fmt.Errorf("dial udp error: %v", err)
}
remote, err := utilnet.NewKCPConnFromUDP(lConn, true, raddr.String())
remote, err := netpkg.NewKCPConnFromUDP(lConn, true, raddr.String())
if err != nil {
return fmt.Errorf("create kcp connection from udp connection error: %v", err)
}
@ -440,7 +440,7 @@ func (qs *QUICTunnelSession) OpenConn(ctx context.Context) (net.Conn, error) {
if err != nil {
return nil, err
}
return utilnet.QuicStreamToNetConn(stream, session), nil
return netpkg.QuicStreamToNetConn(stream, session), nil
}
func (qs *QUICTunnelSession) Close() {

View File

@ -110,7 +110,7 @@ func handleTermSignal(svr *client.Service) {
}
func runClient(cfgFilePath string) error {
cfg, pxyCfgs, visitorCfgs, isLegacyFormat, err := config.LoadClientConfig(cfgFilePath, strictConfigMode)
cfg, proxyCfgs, visitorCfgs, isLegacyFormat, err := config.LoadClientConfig(cfgFilePath, strictConfigMode)
if err != nil {
return err
}
@ -119,19 +119,19 @@ func runClient(cfgFilePath string) error {
"please use yaml/json/toml format instead!\n")
}
warning, err := validation.ValidateAllClientConfig(cfg, pxyCfgs, visitorCfgs)
warning, err := validation.ValidateAllClientConfig(cfg, proxyCfgs, visitorCfgs)
if warning != nil {
fmt.Printf("WARNING: %v\n", warning)
}
if err != nil {
return err
}
return startService(cfg, pxyCfgs, visitorCfgs, cfgFilePath)
return startService(cfg, proxyCfgs, visitorCfgs, cfgFilePath)
}
func startService(
cfg *v1.ClientCommonConfig,
pxyCfgs []v1.ProxyConfigurer,
proxyCfgs []v1.ProxyConfigurer,
visitorCfgs []v1.VisitorConfigurer,
cfgFile string,
) error {
@ -141,7 +141,15 @@ func startService(
log.Info("start frpc service for config file [%s]", cfgFile)
defer log.Info("frpc service for config file [%s] stopped", cfgFile)
}
svr := client.NewService(cfg, pxyCfgs, visitorCfgs, cfgFile)
svr, err := client.NewService(client.ServiceOptions{
Common: cfg,
ProxyCfgs: proxyCfgs,
VisitorCfgs: visitorCfgs,
ConfigFilePath: cfgFile,
})
if err != nil {
return err
}
shouldGracefulClose := cfg.Transport.Protocol == "kcp" || cfg.Transport.Protocol == "quic"
// Capture the exit signal if we use kcp or quic.

View File

@ -37,12 +37,12 @@ var verifyCmd = &cobra.Command{
return nil
}
cliCfg, pxyCfgs, visitorCfgs, _, err := config.LoadClientConfig(cfgFile, strictConfigMode)
cliCfg, proxyCfgs, visitorCfgs, _, err := config.LoadClientConfig(cfgFile, strictConfigMode)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
warning, err := validation.ValidateAllClientConfig(cliCfg, pxyCfgs, visitorCfgs)
warning, err := validation.ValidateAllClientConfig(cliCfg, proxyCfgs, visitorCfgs)
if warning != nil {
fmt.Printf("WARNING: %v\n", warning)
}

31
pkg/auth/pass.go Normal file
View File

@ -0,0 +1,31 @@
// Copyright 2023 The frp Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"github.com/fatedier/frp/pkg/msg"
)
var AlwaysPassVerifier = &alwaysPass{}
var _ Verifier = &alwaysPass{}
type alwaysPass struct{}
func (*alwaysPass) VerifyLogin(*msg.Login) error { return nil }
func (*alwaysPass) VerifyPing(*msg.Ping) error { return nil }
func (*alwaysPass) VerifyNewWorkConn(*msg.NewWorkConn) error { return nil }

View File

@ -59,12 +59,17 @@ func RegisterProxyFlags(cmd *cobra.Command, c v1.ProxyConfigurer) {
case *v1.TCPMuxProxyConfig:
registerProxyDomainConfigFlags(cmd, &cc.DomainConfig)
cmd.Flags().StringVarP(&cc.Multiplexer, "mux", "", "", "multiplexer")
cmd.Flags().StringVarP(&cc.HTTPUser, "http_user", "", "", "http auth user")
cmd.Flags().StringVarP(&cc.HTTPPassword, "http_pwd", "", "", "http auth password")
case *v1.STCPProxyConfig:
cmd.Flags().StringVarP(&cc.Secretkey, "sk", "", "", "secret key")
cmd.Flags().StringSliceVarP(&cc.AllowUsers, "allow_users", "", []string{}, "allow visitor users")
case *v1.SUDPProxyConfig:
cmd.Flags().StringVarP(&cc.Secretkey, "sk", "", "", "secret key")
cmd.Flags().StringSliceVarP(&cc.AllowUsers, "allow_users", "", []string{}, "allow visitor users")
case *v1.XTCPProxyConfig:
cmd.Flags().StringVarP(&cc.Secretkey, "sk", "", "", "secret key")
cmd.Flags().StringSliceVarP(&cc.AllowUsers, "allow_users", "", []string{}, "allow visitor users")
}
}

View File

@ -23,7 +23,7 @@ import (
func ParseClientConfig(filePath string) (
cfg ClientCommonConf,
pxyCfgs map[string]ProxyConf,
proxyCfgs map[string]ProxyConf,
visitorCfgs map[string]VisitorConf,
err error,
) {
@ -56,7 +56,7 @@ func ParseClientConfig(filePath string) (
configBuffer.Write(buf)
// Parse all proxy and visitor configs.
pxyCfgs, visitorCfgs, err = LoadAllProxyConfsFromIni(cfg.User, configBuffer.Bytes(), cfg.Start)
proxyCfgs, visitorCfgs, err = LoadAllProxyConfsFromIni(cfg.User, configBuffer.Bytes(), cfg.Start)
if err != nil {
return
}

View File

@ -110,6 +110,7 @@ func LoadConfigureFromFile(path string, c any, strict bool) error {
// LoadConfigure loads configuration from bytes and unmarshal into c.
// Now it supports json, yaml and toml format.
// TODO(fatedier): strict is not valide for ProxyConfigurer/VisitorConfigurer/ClientPluginOptions.
func LoadConfigure(b []byte, c any, strict bool) error {
var tomlObj interface{}
// Try to unmarshal as TOML first; swallow errors from that (assume it's not valid TOML).
@ -188,19 +189,19 @@ func LoadClientConfig(path string, strict bool) (
) {
var (
cliCfg *v1.ClientCommonConfig
pxyCfgs = make([]v1.ProxyConfigurer, 0)
proxyCfgs = make([]v1.ProxyConfigurer, 0)
visitorCfgs = make([]v1.VisitorConfigurer, 0)
isLegacyFormat bool
)
if DetectLegacyINIFormatFromFile(path) {
legacyCommon, legacyPxyCfgs, legacyVisitorCfgs, err := legacy.ParseClientConfig(path)
legacyCommon, legacyProxyCfgs, legacyVisitorCfgs, err := legacy.ParseClientConfig(path)
if err != nil {
return nil, nil, nil, true, err
}
cliCfg = legacy.Convert_ClientCommonConf_To_v1(&legacyCommon)
for _, c := range legacyPxyCfgs {
pxyCfgs = append(pxyCfgs, legacy.Convert_ProxyConf_To_v1(c))
for _, c := range legacyProxyCfgs {
proxyCfgs = append(proxyCfgs, legacy.Convert_ProxyConf_To_v1(c))
}
for _, c := range legacyVisitorCfgs {
visitorCfgs = append(visitorCfgs, legacy.Convert_VisitorConf_To_v1(c))
@ -213,7 +214,7 @@ func LoadClientConfig(path string, strict bool) (
}
cliCfg = &allCfg.ClientCommonConfig
for _, c := range allCfg.Proxies {
pxyCfgs = append(pxyCfgs, c.ProxyConfigurer)
proxyCfgs = append(proxyCfgs, c.ProxyConfigurer)
}
for _, c := range allCfg.Visitors {
visitorCfgs = append(visitorCfgs, c.VisitorConfigurer)
@ -223,18 +224,18 @@ func LoadClientConfig(path string, strict bool) (
// Load additional config from includes.
// legacy ini format already handle this in ParseClientConfig.
if len(cliCfg.IncludeConfigFiles) > 0 && !isLegacyFormat {
extPxyCfgs, extVisitorCfgs, err := LoadAdditionalClientConfigs(cliCfg.IncludeConfigFiles, isLegacyFormat, strict)
extProxyCfgs, extVisitorCfgs, err := LoadAdditionalClientConfigs(cliCfg.IncludeConfigFiles, isLegacyFormat, strict)
if err != nil {
return nil, nil, nil, isLegacyFormat, err
}
pxyCfgs = append(pxyCfgs, extPxyCfgs...)
proxyCfgs = append(proxyCfgs, extProxyCfgs...)
visitorCfgs = append(visitorCfgs, extVisitorCfgs...)
}
// Filter by start
if len(cliCfg.Start) > 0 {
startSet := sets.New(cliCfg.Start...)
pxyCfgs = lo.Filter(pxyCfgs, func(c v1.ProxyConfigurer, _ int) bool {
proxyCfgs = lo.Filter(proxyCfgs, func(c v1.ProxyConfigurer, _ int) bool {
return startSet.Has(c.GetBaseConfig().Name)
})
visitorCfgs = lo.Filter(visitorCfgs, func(c v1.VisitorConfigurer, _ int) bool {
@ -245,17 +246,17 @@ func LoadClientConfig(path string, strict bool) (
if cliCfg != nil {
cliCfg.Complete()
}
for _, c := range pxyCfgs {
for _, c := range proxyCfgs {
c.Complete(cliCfg.User)
}
for _, c := range visitorCfgs {
c.Complete(cliCfg)
}
return cliCfg, pxyCfgs, visitorCfgs, isLegacyFormat, nil
return cliCfg, proxyCfgs, visitorCfgs, isLegacyFormat, nil
}
func LoadAdditionalClientConfigs(paths []string, isLegacyFormat bool, strict bool) ([]v1.ProxyConfigurer, []v1.VisitorConfigurer, error) {
pxyCfgs := make([]v1.ProxyConfigurer, 0)
proxyCfgs := make([]v1.ProxyConfigurer, 0)
visitorCfgs := make([]v1.VisitorConfigurer, 0)
for _, path := range paths {
absDir, err := filepath.Abs(filepath.Dir(path))
@ -281,7 +282,7 @@ func LoadAdditionalClientConfigs(paths []string, isLegacyFormat bool, strict boo
return nil, nil, fmt.Errorf("load additional config from %s error: %v", absFile, err)
}
for _, c := range cfg.Proxies {
pxyCfgs = append(pxyCfgs, c.ProxyConfigurer)
proxyCfgs = append(proxyCfgs, c.ProxyConfigurer)
}
for _, c := range cfg.Visitors {
visitorCfgs = append(visitorCfgs, c.VisitorConfigurer)
@ -289,5 +290,5 @@ func LoadAdditionalClientConfigs(paths []string, isLegacyFormat bool, strict boo
}
}
}
return pxyCfgs, visitorCfgs, nil
return proxyCfgs, visitorCfgs, nil
}

View File

@ -224,7 +224,9 @@ func NewProxyConfigurerByType(proxyType ProxyType) ProxyConfigurer {
if !ok {
return nil
}
return reflect.New(v).Interface().(ProxyConfigurer)
pc := reflect.New(v).Interface().(ProxyConfigurer)
pc.GetBaseConfig().Type = string(proxyType)
return pc
}
var _ ProxyConfigurer = &TCPProxyConfig{}

View File

@ -80,7 +80,7 @@ func ValidateClientCommonConfig(c *v1.ClientCommonConfig) (Warning, error) {
return warnings, errs
}
func ValidateAllClientConfig(c *v1.ClientCommonConfig, pxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) (Warning, error) {
func ValidateAllClientConfig(c *v1.ClientCommonConfig, proxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) (Warning, error) {
var warnings Warning
if c != nil {
warning, err := ValidateClientCommonConfig(c)
@ -90,7 +90,7 @@ func ValidateAllClientConfig(c *v1.ClientCommonConfig, pxyCfgs []v1.ProxyConfigu
}
}
for _, c := range pxyCfgs {
for _, c := range proxyCfgs {
if err := ValidateProxyConfigurerForClient(c); err != nil {
return warnings, fmt.Errorf("proxy %s: %v", c.GetBaseConfig().Name, err)
}

View File

@ -63,6 +63,15 @@ var msgTypeMap = map[byte]interface{}{
var TypeNameNatHoleResp = reflect.TypeOf(&NatHoleResp{}).Elem().Name()
type ClientSpec struct {
// Due to the support of VirtualClient, frps needs to know the client type in order to
// differentiate the processing logic.
// Optional values: ssh-tunnel
Type string `json:"type,omitempty"`
// If the value is true, the client will not require authentication.
AlwaysAuthPass bool `json:"always_auth_pass,omitempty"`
}
// When frpc start, client send this message to login to server.
type Login struct {
Version string `json:"version,omitempty"`
@ -75,6 +84,9 @@ type Login struct {
RunID string `json:"run_id,omitempty"`
Metas map[string]string `json:"metas,omitempty"`
// Currently only effective for VirtualClient.
ClientSpec ClientSpec `json:"client_spec,omitempty"`
// Some global configures.
PoolCount int `json:"pool_count,omitempty"`
}

View File

@ -24,7 +24,7 @@ import (
"net/http/httputil"
v1 "github.com/fatedier/frp/pkg/config/v1"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
func init() {
@ -79,7 +79,7 @@ func NewHTTP2HTTPSPlugin(options v1.ClientPluginOptions) (Plugin, error) {
}
func (p *HTTP2HTTPSPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn)
_ = p.l.PutConn(wrapConn)
}

View File

@ -29,7 +29,7 @@ import (
libnet "github.com/fatedier/golib/net"
v1 "github.com/fatedier/frp/pkg/config/v1"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
)
@ -68,7 +68,7 @@ func (hp *HTTPProxy) Name() string {
}
func (hp *HTTPProxy) Handle(conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn)
sc, rd := libnet.NewSharedConn(wrapConn)
firstBytes := make([]byte, 7)

View File

@ -26,7 +26,7 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
func init() {
@ -98,7 +98,7 @@ func (p *HTTPS2HTTPPlugin) genTLSConfig() (*tls.Config, error) {
}
func (p *HTTPS2HTTPPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn)
_ = p.l.PutConn(wrapConn)
}

View File

@ -26,7 +26,7 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
func init() {
@ -104,7 +104,7 @@ func (p *HTTPS2HTTPSPlugin) genTLSConfig() (*tls.Config, error) {
}
func (p *HTTPS2HTTPSPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn)
_ = p.l.PutConn(wrapConn)
}

View File

@ -24,7 +24,7 @@ import (
gosocks5 "github.com/armon/go-socks5"
v1 "github.com/fatedier/frp/pkg/config/v1"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
func init() {
@ -52,7 +52,7 @@ func NewSocks5Plugin(options v1.ClientPluginOptions) (p Plugin, err error) {
func (sp *Socks5Plugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) {
defer conn.Close()
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn)
_ = sp.Server.ServeConn(wrapConn)
}

View File

@ -25,7 +25,7 @@ import (
"github.com/gorilla/mux"
v1 "github.com/fatedier/frp/pkg/config/v1"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
func init() {
@ -57,8 +57,8 @@ func NewStaticFilePlugin(options v1.ClientPluginOptions) (Plugin, error) {
}
router := mux.NewRouter()
router.Use(utilnet.NewHTTPAuthMiddleware(opts.HTTPUser, opts.HTTPPassword).SetAuthFailDelay(200 * time.Millisecond).Middleware)
router.PathPrefix(prefix).Handler(utilnet.MakeHTTPGzipHandler(http.StripPrefix(prefix, http.FileServer(http.Dir(opts.LocalPath))))).Methods("GET")
router.Use(netpkg.NewHTTPAuthMiddleware(opts.HTTPUser, opts.HTTPPassword).SetAuthFailDelay(200 * time.Millisecond).Middleware)
router.PathPrefix(prefix).Handler(netpkg.MakeHTTPGzipHandler(http.StripPrefix(prefix, http.FileServer(http.Dir(opts.LocalPath))))).Methods("GET")
sp.s = &http.Server{
Handler: router,
}
@ -69,7 +69,7 @@ func NewStaticFilePlugin(options v1.ClientPluginOptions) (Plugin, error) {
}
func (sp *StaticFilePlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn)
_ = sp.l.PutConn(wrapConn)
}

View File

@ -11,7 +11,7 @@ import (
"strings"
"github.com/fatedier/frp/client"
"github.com/fatedier/frp/pkg/util/util"
httppkg "github.com/fatedier/frp/pkg/util/http"
)
type Client struct {
@ -115,7 +115,7 @@ func (c *Client) UpdateConfig(content string) error {
func (c *Client) setAuthHeader(req *http.Request) {
if c.authUser != "" || c.authPwd != "" {
req.Header.Set("Authorization", util.BasicAuth(c.authUser, c.authPwd))
req.Header.Set("Authorization", httppkg.BasicAuth(c.authUser, c.authPwd))
}
}

View File

@ -26,21 +26,21 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/log"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
type Gateway struct {
bindPort int
ln net.Listener
serverPeerListener *utilnet.InternalListener
peerServerListener *netpkg.InternalListener
sshConfig *ssh.ServerConfig
}
func NewGateway(
cfg v1.SSHTunnelGateway, bindAddr string,
serverPeerListener *utilnet.InternalListener,
peerServerListener *netpkg.InternalListener,
) (*Gateway, error) {
sshConfig := &ssh.ServerConfig{}
@ -71,15 +71,8 @@ func NewGateway(
}
sshConfig.AddHostKey(privateKey)
sshConfig.NoClientAuth = cfg.AuthorizedKeysFile == ""
sshConfig.PublicKeyCallback = func(conn ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) {
if cfg.AuthorizedKeysFile == "" {
return &ssh.Permissions{
Extensions: map[string]string{
"user": "",
},
}, nil
}
authorizedKeysMap, err := loadAuthorizedKeysFromFile(cfg.AuthorizedKeysFile)
if err != nil {
return nil, fmt.Errorf("internal error")
@ -103,7 +96,7 @@ func NewGateway(
return &Gateway{
bindPort: cfg.BindPort,
ln: ln,
serverPeerListener: serverPeerListener,
peerServerListener: peerServerListener,
sshConfig: sshConfig,
}, nil
}
@ -121,7 +114,7 @@ func (g *Gateway) Run() {
func (g *Gateway) handleConn(conn net.Conn) {
defer conn.Close()
ts, err := NewTunnelServer(conn, g.sshConfig, g.serverPeerListener)
ts, err := NewTunnelServer(conn, g.sshConfig, g.peerServerListener)
if err != nil {
return
}

View File

@ -17,9 +17,11 @@ package ssh
import (
"context"
"encoding/binary"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
libio "github.com/fatedier/golib/io"
@ -27,10 +29,12 @@ import (
"github.com/spf13/cobra"
"golang.org/x/crypto/ssh"
"github.com/fatedier/frp/client/proxy"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
utilnet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/log"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/pkg/virtual"
@ -64,15 +68,16 @@ type TunnelServer struct {
sc *ssh.ServerConfig
vc *virtual.Client
serverPeerListener *utilnet.InternalListener
peerServerListener *netpkg.InternalListener
doneCh chan struct{}
closeDoneChOnce sync.Once
}
func NewTunnelServer(conn net.Conn, sc *ssh.ServerConfig, serverPeerListener *utilnet.InternalListener) (*TunnelServer, error) {
func NewTunnelServer(conn net.Conn, sc *ssh.ServerConfig, peerServerListener *netpkg.InternalListener) (*TunnelServer, error) {
s := &TunnelServer{
underlyingConn: conn,
sc: sc,
serverPeerListener: serverPeerListener,
peerServerListener: peerServerListener,
doneCh: make(chan struct{}),
}
return s, nil
@ -94,19 +99,35 @@ func (s *TunnelServer) Run() error {
if err != nil {
return err
}
clientCfg.Complete()
if sshConn.Permissions != nil {
clientCfg.User = util.EmptyOr(sshConn.Permissions.Extensions["user"], clientCfg.User)
}
pc.Complete(clientCfg.User)
s.vc = virtual.NewClient(clientCfg)
vc, err := virtual.NewClient(virtual.ClientOptions{
Common: clientCfg,
Spec: &msg.ClientSpec{
Type: "ssh-tunnel",
// If ssh does not require authentication, then the virtual client needs to authenticate through a token.
// Otherwise, once ssh authentication is passed, the virtual client does not need to authenticate again.
AlwaysAuthPass: !s.sc.NoClientAuth,
},
HandleWorkConnCb: func(base *v1.ProxyBaseConfig, workConn net.Conn, m *msg.StartWorkConn) bool {
// join workConn and ssh channel
s.vc.SetInWorkConnCallback(func(base *v1.ProxyBaseConfig, workConn net.Conn, m *msg.StartWorkConn) bool {
c, err := s.openConn(addr)
if err != nil {
return false
}
libio.Join(c, workConn)
return false
},
})
if err != nil {
return err
}
s.vc = vc
// transfer connection from virtual client to server peer listener
go func() {
l := s.vc.PeerListener()
@ -115,21 +136,35 @@ func (s *TunnelServer) Run() error {
if err != nil {
return
}
_ = s.serverPeerListener.PutConn(conn)
_ = s.peerServerListener.PutConn(conn)
}
}()
xl := xlog.New().AddPrefix(xlog.LogPrefix{Name: "sshVirtualClient", Value: "sshVirtualClient", Priority: 100})
ctx := xlog.NewContext(context.Background(), xl)
go func() {
_ = s.vc.Run(ctx)
// If vc.Run returns, it means that the virtual client has been closed, and the ssh tunnel connection should be closed.
// One scenario is that the virtual client exits due to login failure.
s.closeDoneChOnce.Do(func() {
_ = sshConn.Close()
close(s.doneCh)
})
}()
s.vc.UpdateProxyConfigurer([]v1.ProxyConfigurer{pc})
if err := s.waitProxyStatusReady(pc.GetBaseConfig().Name, time.Second); err != nil {
log.Warn("wait proxy status ready error: %v", err)
} else {
_ = sshConn.Wait()
_ = sshConn.Close()
}
s.vc.Close()
log.Trace("ssh tunnel connection from %v closed", sshConn.RemoteAddr())
s.closeDoneChOnce.Do(func() {
_ = sshConn.Close()
close(s.doneCh)
})
return nil
}
@ -217,6 +252,14 @@ func (s *TunnelServer) parseClientAndProxyConfigurer(_ *tcpipForward, extraPaylo
if err := cmd.ParseFlags(args); err != nil {
return nil, nil, fmt.Errorf("parse flags from ssh client error: %v", err)
}
// if name is not set, generate a random one
if pc.GetBaseConfig().Name == "" {
id, err := util.RandIDWithLen(8)
if err != nil {
return nil, nil, fmt.Errorf("generate random id error: %v", err)
}
pc.GetBaseConfig().Name = fmt.Sprintf("sshtunnel-%s-%s", proxyType, id)
}
return &clientCfg, pc, nil
}
@ -274,6 +317,34 @@ func (s *TunnelServer) openConn(addr *tcpipForward) (net.Conn, error) {
}
go ssh.DiscardRequests(reqs)
conn := utilnet.WrapReadWriteCloserToConn(channel, s.underlyingConn)
conn := netpkg.WrapReadWriteCloserToConn(channel, s.underlyingConn)
return conn, nil
}
func (s *TunnelServer) waitProxyStatusReady(name string, timeout time.Duration) error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ticker.C:
ps, err := s.vc.Service().GetProxyStatus(name)
if err != nil {
continue
}
switch ps.Phase {
case proxy.ProxyPhaseRunning:
return nil
case proxy.ProxyPhaseStartErr, proxy.ProxyPhaseClosed:
return errors.New(ps.Err)
}
case <-timer.C:
return fmt.Errorf("wait proxy status ready timeout")
case <-s.doneCh:
return fmt.Errorf("ssh tunnel server closed")
}
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package util
package http
import (
"encoding/base64"

128
pkg/util/http/server.go Normal file
View File

@ -0,0 +1,128 @@
// Copyright 2023 The frp Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import (
"crypto/tls"
"net"
"net/http"
"net/http/pprof"
"strconv"
"time"
"github.com/gorilla/mux"
"github.com/fatedier/frp/assets"
v1 "github.com/fatedier/frp/pkg/config/v1"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
var (
defaultReadTimeout = 60 * time.Second
defaultWriteTimeout = 60 * time.Second
)
type Server struct {
addr string
ln net.Listener
tlsCfg *tls.Config
router *mux.Router
hs *http.Server
authMiddleware mux.MiddlewareFunc
}
func NewServer(cfg v1.WebServerConfig) (*Server, error) {
if cfg.AssetsDir != "" {
assets.Load(cfg.AssetsDir)
}
addr := net.JoinHostPort(cfg.Addr, strconv.Itoa(cfg.Port))
if addr == ":" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
router := mux.NewRouter()
hs := &http.Server{
Addr: addr,
Handler: router,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
}
s := &Server{
addr: addr,
ln: ln,
hs: hs,
router: router,
}
if cfg.PprofEnable {
s.registerPprofHandlers()
}
if cfg.TLS != nil {
cert, err := tls.LoadX509KeyPair(cfg.TLS.CertFile, cfg.TLS.KeyFile)
if err != nil {
return nil, err
}
s.tlsCfg = &tls.Config{
Certificates: []tls.Certificate{cert},
}
}
s.authMiddleware = netpkg.NewHTTPAuthMiddleware(cfg.User, cfg.Password).SetAuthFailDelay(200 * time.Millisecond).Middleware
return s, nil
}
func (s *Server) Address() string {
return s.addr
}
func (s *Server) Run() error {
ln := s.ln
if s.tlsCfg != nil {
ln = tls.NewListener(ln, s.tlsCfg)
}
return s.hs.Serve(ln)
}
func (s *Server) Close() error {
return s.hs.Close()
}
type RouterRegisterHelper struct {
Router *mux.Router
AssetsFS http.FileSystem
AuthMiddleware mux.MiddlewareFunc
}
func (s *Server) RouteRegister(register func(helper *RouterRegisterHelper)) {
register(&RouterRegisterHelper{
Router: s.router,
AssetsFS: assets.FileSystem,
AuthMiddleware: s.authMiddleware,
})
}
func (s *Server) registerPprofHandlers() {
s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
s.router.HandleFunc("/debug/pprof/profile", pprof.Profile)
s.router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
s.router.HandleFunc("/debug/pprof/trace", pprof.Trace)
s.router.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
}

33
pkg/util/net/dns.go Normal file
View File

@ -0,0 +1,33 @@
// Copyright 2023 The frp Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package net
import (
"context"
"net"
)
func SetDefaultDNSAddress(dnsAddress string) {
if _, _, err := net.SplitHostPort(dnsAddress); err != nil {
dnsAddress = net.JoinHostPort(dnsAddress, "53")
}
// Change default dns server
net.DefaultResolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return net.Dial("udp", dnsAddress)
},
}
}

View File

@ -52,7 +52,10 @@ func (l *InternalListener) PutConn(conn net.Conn) error {
conn.Close()
}
})
return err
if err != nil {
return fmt.Errorf("put conn error: listener is closed")
}
return nil
}
func (l *InternalListener) Close() error {

View File

@ -24,7 +24,7 @@ import (
libnet "github.com/fatedier/golib/net"
"github.com/fatedier/frp/pkg/util/util"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/pkg/util/vhost"
)
@ -59,10 +59,10 @@ func (muxer *HTTPConnectTCPMuxer) readHTTPConnectRequest(rd io.Reader) (host, ht
return
}
host, _ = util.CanonicalHost(req.Host)
host, _ = httppkg.CanonicalHost(req.Host)
proxyAuth := req.Header.Get("Proxy-Authorization")
if proxyAuth != "" {
httpUser, httpPwd, _ = util.ParseBasicAuth(proxyAuth)
httpUser, httpPwd, _ = httppkg.ParseBasicAuth(proxyAuth)
}
return
}
@ -71,7 +71,7 @@ func (muxer *HTTPConnectTCPMuxer) sendConnectResponse(c net.Conn, _ map[string]s
if muxer.passthrough {
return nil
}
res := util.OkResponse()
res := httppkg.OkResponse()
if res.Body != nil {
defer res.Body.Close()
}
@ -85,7 +85,7 @@ func (muxer *HTTPConnectTCPMuxer) auth(c net.Conn, username, password string, re
return true, nil
}
resp := util.ProxyUnauthorizedResponse()
resp := httppkg.ProxyUnauthorizedResponse()
if resp.Body != nil {
defer resp.Body.Close()
}

View File

@ -31,8 +31,8 @@ import (
libio "github.com/fatedier/golib/io"
"github.com/fatedier/golib/pool"
frpLog "github.com/fatedier/frp/pkg/util/log"
"github.com/fatedier/frp/pkg/util/util"
httppkg "github.com/fatedier/frp/pkg/util/http"
logpkg "github.com/fatedier/frp/pkg/util/log"
)
var ErrNoRouteFound = errors.New("no route found")
@ -61,7 +61,7 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
Director: func(req *http.Request) {
req.URL.Scheme = "http"
reqRouteInfo := req.Context().Value(RouteInfoKey).(*RequestRouteInfo)
oldHost, _ := util.CanonicalHost(reqRouteInfo.Host)
oldHost, _ := httppkg.CanonicalHost(reqRouteInfo.Host)
rc := rp.GetRouteConfig(oldHost, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
if rc != nil {
@ -74,7 +74,7 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
// ignore error here, it will use CreateConnFn instead later
endpoint, _ = rc.ChooseEndpointFn()
reqRouteInfo.Endpoint = endpoint
frpLog.Trace("choose endpoint name [%s] for http request host [%s] path [%s] httpuser [%s]",
logpkg.Trace("choose endpoint name [%s] for http request host [%s] path [%s] httpuser [%s]",
endpoint, oldHost, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
}
// Set {domain}.{location}.{routeByHTTPUser}.{endpoint} as URL host here to let http transport reuse connections.
@ -116,7 +116,7 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
BufferPool: newWrapPool(),
ErrorLog: log.New(newWrapLogger(), "", 0),
ErrorHandler: func(rw http.ResponseWriter, req *http.Request, err error) {
frpLog.Warn("do http proxy request [host: %s] error: %v", req.Host, err)
logpkg.Warn("do http proxy request [host: %s] error: %v", req.Host, err)
rw.WriteHeader(http.StatusNotFound)
_, _ = rw.Write(getNotFoundPageContent())
},
@ -143,7 +143,7 @@ func (rp *HTTPReverseProxy) UnRegister(routeCfg RouteConfig) {
func (rp *HTTPReverseProxy) GetRouteConfig(domain, location, routeByHTTPUser string) *RouteConfig {
vr, ok := rp.getVhost(domain, location, routeByHTTPUser)
if ok {
frpLog.Debug("get new HTTP request host [%s] path [%s] httpuser [%s]", domain, location, routeByHTTPUser)
logpkg.Debug("get new HTTP request host [%s] path [%s] httpuser [%s]", domain, location, routeByHTTPUser)
return vr.payload.(*RouteConfig)
}
return nil
@ -159,7 +159,7 @@ func (rp *HTTPReverseProxy) GetHeaders(domain, location, routeByHTTPUser string)
// CreateConnection create a new connection by route config
func (rp *HTTPReverseProxy) CreateConnection(reqRouteInfo *RequestRouteInfo, byEndpoint bool) (net.Conn, error) {
host, _ := util.CanonicalHost(reqRouteInfo.Host)
host, _ := httppkg.CanonicalHost(reqRouteInfo.Host)
vr, ok := rp.getVhost(host, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
if ok {
if byEndpoint {
@ -303,7 +303,7 @@ func (rp *HTTPReverseProxy) injectRequestInfoToCtx(req *http.Request) *http.Requ
}
func (rp *HTTPReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
domain, _ := util.CanonicalHost(req.Host)
domain, _ := httppkg.CanonicalHost(req.Host)
location := req.URL.Path
user, passwd, _ := req.BasicAuth()
if !rp.CheckAuth(domain, location, user, user, passwd) {
@ -333,6 +333,6 @@ type wrapLogger struct{}
func newWrapLogger() *wrapLogger { return &wrapLogger{} }
func (l *wrapLogger) Write(p []byte) (n int, err error) {
frpLog.Warn("%s", string(bytes.TrimRight(p, "\n")))
logpkg.Warn("%s", string(bytes.TrimRight(p, "\n")))
return len(p), nil
}

View File

@ -20,7 +20,7 @@ import (
"net/http"
"os"
frpLog "github.com/fatedier/frp/pkg/util/log"
logpkg "github.com/fatedier/frp/pkg/util/log"
"github.com/fatedier/frp/pkg/util/version"
)
@ -58,7 +58,7 @@ func getNotFoundPageContent() []byte {
if NotFoundPagePath != "" {
buf, err = os.ReadFile(NotFoundPagePath)
if err != nil {
frpLog.Warn("read custom 404 page error: %v", err)
logpkg.Warn("read custom 404 page error: %v", err)
buf = []byte(NotFound)
}
} else {

View File

@ -22,7 +22,7 @@ import (
"github.com/fatedier/golib/errors"
"github.com/fatedier/frp/pkg/util/log"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
)
@ -284,7 +284,7 @@ func (l *Listener) Accept() (net.Conn, error) {
xl.Debug("rewrite host to [%s] success", l.rewriteHost)
conn = sConn
}
return utilnet.NewContextConn(l.ctx, conn), nil
return netpkg.NewContextConn(l.ctx, conn), nil
}
func (l *Listener) Close() error {

View File

@ -21,55 +21,70 @@ import (
"github.com/fatedier/frp/client"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
type ClientOptions struct {
Common *v1.ClientCommonConfig
Spec *msg.ClientSpec
HandleWorkConnCb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool
}
type Client struct {
l *utilnet.InternalListener
l *netpkg.InternalListener
svr *client.Service
}
func NewClient(cfg *v1.ClientCommonConfig) *Client {
cfg.Complete()
func NewClient(options ClientOptions) (*Client, error) {
if options.Common != nil {
options.Common.Complete()
}
ln := utilnet.NewInternalListener()
ln := netpkg.NewInternalListener()
svr := client.NewService(cfg, nil, nil, "")
svr.SetConnectorCreator(func(context.Context, *v1.ClientCommonConfig) client.Connector {
serviceOptions := client.ServiceOptions{
Common: options.Common,
ClientSpec: options.Spec,
ConnectorCreator: func(context.Context, *v1.ClientCommonConfig) client.Connector {
return &pipeConnector{
peerListener: ln,
}
})
},
HandleWorkConnCb: options.HandleWorkConnCb,
}
svr, err := client.NewService(serviceOptions)
if err != nil {
return nil, err
}
return &Client{
l: ln,
svr: svr,
}
}, nil
}
func (c *Client) PeerListener() net.Listener {
return c.l
}
func (c *Client) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
c.svr.SetInWorkConnCallback(cb)
}
func (c *Client) UpdateProxyConfigurer(proxyCfgs []v1.ProxyConfigurer) {
_ = c.svr.ReloadConf(proxyCfgs, nil)
_ = c.svr.UpdateAllConfigurer(proxyCfgs, nil)
}
func (c *Client) Run(ctx context.Context) error {
return c.svr.Run(ctx)
}
func (c *Client) Service() *client.Service {
return c.svr
}
func (c *Client) Close() {
c.l.Close()
c.svr.Close()
c.l.Close()
}
type pipeConnector struct {
peerListener *utilnet.InternalListener
peerListener *netpkg.InternalListener
}
func (pc *pipeConnector) Open() error {

View File

@ -32,7 +32,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/wait"
@ -150,6 +150,7 @@ type Control struct {
doneCh chan struct{}
}
// TODO(fatedier): Referencing the implementation of frpc, encapsulate the input parameters as SessionContext.
func NewControl(
ctx context.Context,
rc *controller.ResourceController,
@ -157,6 +158,7 @@ func NewControl(
pluginManager *plugin.Manager,
authVerifier auth.Verifier,
ctlConn net.Conn,
ctlConnEncrypted bool,
loginMsg *msg.Login,
serverCfg *v1.ServerConfig,
) (*Control, error) {
@ -183,11 +185,15 @@ func NewControl(
}
ctl.lastPing.Store(time.Now())
cryptoRW, err := utilnet.NewCryptoReadWriter(ctl.conn, []byte(ctl.serverCfg.Auth.Token))
if ctlConnEncrypted {
cryptoRW, err := netpkg.NewCryptoReadWriter(ctl.conn, []byte(ctl.serverCfg.Auth.Token))
if err != nil {
return nil, err
}
ctl.msgDispatcher = msg.NewDispatcher(cryptoRW)
} else {
ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
}
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
return ctl, nil
@ -300,6 +306,7 @@ func (ctl *Control) heartbeatWorker() {
go wait.Until(func() {
if time.Since(ctl.lastPing.Load().(time.Time)) > time.Duration(ctl.serverCfg.Transport.HeartbeatTimeout)*time.Second {
xl.Warn("heartbeat timeout")
ctl.conn.Close()
return
}
}, time.Second, ctl.doneCh)
@ -555,6 +562,5 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
go func() {
_ = ctl.pluginManager.CloseProxy(notifyContent)
}()
return
}

View File

@ -1,99 +0,0 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"crypto/tls"
"net"
"net/http"
"net/http/pprof"
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/fatedier/frp/assets"
utilnet "github.com/fatedier/frp/pkg/util/net"
)
var (
httpServerReadTimeout = 60 * time.Second
httpServerWriteTimeout = 60 * time.Second
)
func (svr *Service) RunDashboardServer(address string) (err error) {
// url router
router := mux.NewRouter()
router.HandleFunc("/healthz", svr.Healthz)
// debug
if svr.cfg.WebServer.PprofEnable {
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
router.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
}
subRouter := router.NewRoute().Subrouter()
user, passwd := svr.cfg.WebServer.User, svr.cfg.WebServer.Password
subRouter.Use(utilnet.NewHTTPAuthMiddleware(user, passwd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
// metrics
if svr.cfg.EnablePrometheus {
subRouter.Handle("/metrics", promhttp.Handler())
}
// api, see dashboard_api.go
subRouter.HandleFunc("/api/serverinfo", svr.APIServerInfo).Methods("GET")
subRouter.HandleFunc("/api/proxy/{type}", svr.APIProxyByType).Methods("GET")
subRouter.HandleFunc("/api/proxy/{type}/{name}", svr.APIProxyByTypeAndName).Methods("GET")
subRouter.HandleFunc("/api/traffic/{name}", svr.APIProxyTraffic).Methods("GET")
// view
subRouter.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET")
subRouter.PathPrefix("/static/").Handler(utilnet.MakeHTTPGzipHandler(http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))).Methods("GET")
subRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/static/", http.StatusMovedPermanently)
})
server := &http.Server{
Addr: address,
Handler: router,
ReadTimeout: httpServerReadTimeout,
WriteTimeout: httpServerWriteTimeout,
}
ln, err := net.Listen("tcp", address)
if err != nil {
return err
}
if svr.cfg.WebServer.TLS != nil {
cert, err := tls.LoadX509KeyPair(svr.cfg.WebServer.TLS.CertFile, svr.cfg.WebServer.TLS.KeyFile)
if err != nil {
return err
}
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
}
ln = tls.NewListener(ln, tlsCfg)
}
go func() {
_ = server.Serve(ln)
}()
return
}

View File

@ -19,19 +19,52 @@ import (
"net/http"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/fatedier/frp/pkg/config/types"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/metrics/mem"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/pkg/util/log"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/version"
)
// TODO(fatedier): add an API to clean status of all offline proxies.
type GeneralResponse struct {
Code int
Msg string
}
func (svr *Service) registerRouteHandlers(helper *httppkg.RouterRegisterHelper) {
helper.Router.HandleFunc("/healthz", svr.healthz)
subRouter := helper.Router.NewRoute().Subrouter()
subRouter.Use(helper.AuthMiddleware.Middleware)
// metrics
if svr.cfg.EnablePrometheus {
subRouter.Handle("/metrics", promhttp.Handler())
}
// apis
subRouter.HandleFunc("/api/serverinfo", svr.apiServerInfo).Methods("GET")
subRouter.HandleFunc("/api/proxy/{type}", svr.apiProxyByType).Methods("GET")
subRouter.HandleFunc("/api/proxy/{type}/{name}", svr.apiProxyByTypeAndName).Methods("GET")
subRouter.HandleFunc("/api/traffic/{name}", svr.apiProxyTraffic).Methods("GET")
// view
subRouter.Handle("/favicon.ico", http.FileServer(helper.AssetsFS)).Methods("GET")
subRouter.PathPrefix("/static/").Handler(
netpkg.MakeHTTPGzipHandler(http.StripPrefix("/static/", http.FileServer(helper.AssetsFS))),
).Methods("GET")
subRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/static/", http.StatusMovedPermanently)
})
}
type serverInfoResp struct {
Version string `json:"version"`
BindPort int `json:"bindPort"`
@ -55,12 +88,12 @@ type serverInfoResp struct {
}
// /healthz
func (svr *Service) Healthz(w http.ResponseWriter, _ *http.Request) {
func (svr *Service) healthz(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
}
// /api/serverinfo
func (svr *Service) APIServerInfo(w http.ResponseWriter, r *http.Request) {
func (svr *Service) apiServerInfo(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200}
defer func() {
log.Info("Http response [%s]: code [%d]", r.URL.Path, res.Code)
@ -177,7 +210,7 @@ type GetProxyInfoResp struct {
}
// /api/proxy/:type
func (svr *Service) APIProxyByType(w http.ResponseWriter, r *http.Request) {
func (svr *Service) apiProxyByType(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200}
params := mux.Vars(r)
proxyType := params["type"]
@ -245,7 +278,7 @@ type GetProxyStatsResp struct {
}
// /api/proxy/:type/:name
func (svr *Service) APIProxyByTypeAndName(w http.ResponseWriter, r *http.Request) {
func (svr *Service) apiProxyByTypeAndName(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200}
params := mux.Vars(r)
proxyType := params["type"]
@ -314,7 +347,7 @@ type GetProxyTrafficResp struct {
TrafficOut []int64 `json:"trafficOut"`
}
func (svr *Service) APIProxyTraffic(w http.ResponseWriter, r *http.Request) {
func (svr *Service) apiProxyTraffic(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200}
params := mux.Vars(r)
name := params["name"]

View File

@ -24,7 +24,7 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/vhost"
"github.com/fatedier/frp/server/metrics"
@ -180,8 +180,8 @@ func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err
})
}
workConn = utilnet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = utilnet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
workConn = netpkg.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = netpkg.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
return
}

View File

@ -32,7 +32,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/metrics"
@ -130,7 +130,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
}
xl.Debug("get a new work connection: [%s]", workConn.RemoteAddr().String())
xl.Spawn().AppendPrefix(pxy.GetName())
workConn = utilnet.NewContextConn(pxy.ctx, workConn)
workConn = netpkg.NewContextConn(pxy.ctx, workConn)
var (
srcAddr string

View File

@ -30,7 +30,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/server/metrics"
)
@ -222,7 +222,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
})
}
pxy.workConn = utilnet.WrapReadWriteCloserToConn(rwc, workConn)
pxy.workConn = netpkg.WrapReadWriteCloserToConn(rwc, workConn)
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(pxy.workConn)
go workConnSenderFn(pxy.workConn, ctx)

View File

@ -30,7 +30,6 @@ import (
quic "github.com/quic-go/quic-go"
"github.com/samber/lo"
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/pkg/auth"
v1 "github.com/fatedier/frp/pkg/config/v1"
modelmetrics "github.com/fatedier/frp/pkg/metrics"
@ -39,8 +38,9 @@ import (
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/ssh"
"github.com/fatedier/frp/pkg/transport"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/pkg/util/log"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/tcpmux"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
@ -79,7 +79,8 @@ type Service struct {
// Accept frp tls connections
tlsListener net.Listener
virtualListener *utilnet.InternalListener
// Accept pipe connections from ssh tunnel gateway
sshTunnelListener *netpkg.InternalListener
// Manage all controllers
ctlManager *ControlManager
@ -96,6 +97,9 @@ type Service struct {
// All resource managers and controllers
rc *controller.ResourceController
// web server for dashboard UI and apis
webServer *httppkg.Server
sshTunnelGateway *ssh.Gateway
// Verifies authentication based on selected method
@ -111,16 +115,30 @@ type Service struct {
cancel context.CancelFunc
}
func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
func NewService(cfg *v1.ServerConfig) (*Service, error) {
tlsConfig, err := transport.NewServerTLSConfig(
cfg.Transport.TLS.CertFile,
cfg.Transport.TLS.KeyFile,
cfg.Transport.TLS.TrustedCaFile)
if err != nil {
return
return nil, err
}
svr = &Service{
var webServer *httppkg.Server
if cfg.WebServer.Port > 0 {
ws, err := httppkg.NewServer(cfg.WebServer)
if err != nil {
return nil, err
}
webServer = ws
modelmetrics.EnableMem()
if cfg.EnablePrometheus {
modelmetrics.EnablePrometheus()
}
}
svr := &Service{
ctlManager: NewControlManager(),
pxyManager: proxy.NewManager(),
pluginManager: plugin.NewManager(),
@ -129,13 +147,17 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
TCPPortManager: ports.NewManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
UDPPortManager: ports.NewManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
},
virtualListener: utilnet.NewInternalListener(),
sshTunnelListener: netpkg.NewInternalListener(),
httpVhostRouter: vhost.NewRouters(),
authVerifier: auth.NewAuthVerifier(cfg.Auth),
webServer: webServer,
tlsConfig: tlsConfig,
cfg: cfg,
ctx: context.Background(),
}
if webServer != nil {
webServer.RouteRegister(svr.registerRouteHandlers)
}
// Create tcpmux httpconnect multiplexer.
if cfg.TCPMuxHTTPConnectPort > 0 {
@ -143,14 +165,12 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
address := net.JoinHostPort(cfg.ProxyBindAddr, strconv.Itoa(cfg.TCPMuxHTTPConnectPort))
l, err = net.Listen("tcp", address)
if err != nil {
err = fmt.Errorf("create server listener error, %v", err)
return
return nil, fmt.Errorf("create server listener error, %v", err)
}
svr.rc.TCPMuxHTTPConnectMuxer, err = tcpmux.NewHTTPConnectTCPMuxer(l, cfg.TCPMuxPassthrough, vhostReadWriteTimeout)
if err != nil {
err = fmt.Errorf("create vhost tcpMuxer error, %v", err)
return
return nil, fmt.Errorf("create vhost tcpMuxer error, %v", err)
}
log.Info("tcpmux httpconnect multiplexer listen on %s, passthough: %v", address, cfg.TCPMuxPassthrough)
}
@ -191,8 +211,7 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.BindPort))
ln, err := net.Listen("tcp", address)
if err != nil {
err = fmt.Errorf("create server listener error, %v", err)
return
return nil, fmt.Errorf("create server listener error, %v", err)
}
svr.muxer = mux.NewMux(ln)
@ -208,10 +227,9 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
// Listen for accepting connections from client using kcp protocol.
if cfg.KCPBindPort > 0 {
address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.KCPBindPort))
svr.kcpListener, err = utilnet.ListenKcp(address)
svr.kcpListener, err = netpkg.ListenKcp(address)
if err != nil {
err = fmt.Errorf("listen on kcp udp address %s error: %v", address, err)
return
return nil, fmt.Errorf("listen on kcp udp address %s error: %v", address, err)
}
log.Info("frps kcp listen on udp %s", address)
}
@ -226,28 +244,26 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
KeepAlivePeriod: time.Duration(cfg.Transport.QUIC.KeepalivePeriod) * time.Second,
})
if err != nil {
err = fmt.Errorf("listen on quic udp address %s error: %v", address, err)
return
return nil, fmt.Errorf("listen on quic udp address %s error: %v", address, err)
}
log.Info("frps quic listen on %s", address)
}
if cfg.SSHTunnelGateway.BindPort > 0 {
sshGateway, err := ssh.NewGateway(cfg.SSHTunnelGateway, cfg.ProxyBindAddr, svr.virtualListener)
sshGateway, err := ssh.NewGateway(cfg.SSHTunnelGateway, cfg.ProxyBindAddr, svr.sshTunnelListener)
if err != nil {
err = fmt.Errorf("create ssh gateway error: %v", err)
return nil, err
return nil, fmt.Errorf("create ssh gateway error: %v", err)
}
svr.sshTunnelGateway = sshGateway
log.Info("frps sshTunnelGateway listen on port %d", cfg.SSHTunnelGateway.BindPort)
}
// Listen for accepting connections from client using websocket protocol.
websocketPrefix := []byte("GET " + utilnet.FrpWebsocketPath)
websocketPrefix := []byte("GET " + netpkg.FrpWebsocketPath)
websocketLn := svr.muxer.Listen(0, uint32(len(websocketPrefix)), func(data []byte) bool {
return bytes.Equal(data, websocketPrefix)
})
svr.websocketListener = utilnet.NewWebsocketListener(websocketLn)
svr.websocketListener = netpkg.NewWebsocketListener(websocketLn)
// Create http vhost muxer.
if cfg.VhostHTTPPort > 0 {
@ -267,8 +283,7 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
} else {
l, err = net.Listen("tcp", address)
if err != nil {
err = fmt.Errorf("create vhost http listener error, %v", err)
return
return nil, fmt.Errorf("create vhost http listener error, %v", err)
}
}
go func() {
@ -286,55 +301,30 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) {
address := net.JoinHostPort(cfg.ProxyBindAddr, strconv.Itoa(cfg.VhostHTTPSPort))
l, err = net.Listen("tcp", address)
if err != nil {
err = fmt.Errorf("create server listener error, %v", err)
return
return nil, fmt.Errorf("create server listener error, %v", err)
}
log.Info("https service listen on %s", address)
}
svr.rc.VhostHTTPSMuxer, err = vhost.NewHTTPSMuxer(l, vhostReadWriteTimeout)
if err != nil {
err = fmt.Errorf("create vhost httpsMuxer error, %v", err)
return
return nil, fmt.Errorf("create vhost httpsMuxer error, %v", err)
}
}
// frp tls listener
svr.tlsListener = svr.muxer.Listen(2, 1, func(data []byte) bool {
// tls first byte can be 0x16 only when vhost https port is not same with bind port
return int(data[0]) == utilnet.FRPTLSHeadByte || int(data[0]) == 0x16
return int(data[0]) == netpkg.FRPTLSHeadByte || int(data[0]) == 0x16
})
// Create nat hole controller.
nc, err := nathole.NewController(time.Duration(cfg.NatHoleAnalysisDataReserveHours) * time.Hour)
if err != nil {
err = fmt.Errorf("create nat hole controller error, %v", err)
return
return nil, fmt.Errorf("create nat hole controller error, %v", err)
}
svr.rc.NatHoleController = nc
var statsEnable bool
// Create dashboard web server.
if cfg.WebServer.Port > 0 {
// Init dashboard assets
assets.Load(cfg.WebServer.AssetsDir)
address := net.JoinHostPort(cfg.WebServer.Addr, strconv.Itoa(cfg.WebServer.Port))
err = svr.RunDashboardServer(address)
if err != nil {
err = fmt.Errorf("create dashboard web server error, %v", err)
return
}
log.Info("Dashboard listen on %s", address)
statsEnable = true
}
if statsEnable {
modelmetrics.EnableMem()
if cfg.EnablePrometheus {
modelmetrics.EnablePrometheus()
}
}
return
return svr, nil
}
func (svr *Service) Run(ctx context.Context) {
@ -342,7 +332,17 @@ func (svr *Service) Run(ctx context.Context) {
svr.ctx = ctx
svr.cancel = cancel
go svr.HandleListener(svr.virtualListener, true)
// run dashboard web server.
if svr.webServer != nil {
go func() {
log.Info("dashboard listen on %s", svr.webServer.Address())
if err := svr.webServer.Run(); err != nil {
log.Warn("dashboard server exit with error: %v", err)
}
}()
}
go svr.HandleListener(svr.sshTunnelListener, true)
if svr.kcpListener != nil {
go svr.HandleListener(svr.kcpListener, false)
@ -398,7 +398,7 @@ func (svr *Service) Close() error {
return nil
}
func (svr *Service) handleConnection(ctx context.Context, conn net.Conn) {
func (svr *Service) handleConnection(ctx context.Context, conn net.Conn, internal bool) {
xl := xlog.FromContextSafe(ctx)
var (
@ -424,7 +424,7 @@ func (svr *Service) handleConnection(ctx context.Context, conn net.Conn) {
retContent, err := svr.pluginManager.Login(content)
if err == nil {
m = &retContent.Login
err = svr.RegisterControl(conn, m)
err = svr.RegisterControl(conn, m, internal)
}
// If login failed, send error message there.
@ -461,6 +461,9 @@ func (svr *Service) handleConnection(ctx context.Context, conn net.Conn) {
}
}
// HandleListener accepts connections from client and call handleConnection to handle them.
// If internal is true, it means that this listener is used for internal communication like ssh tunnel gateway.
// TODO(fatedier): Pass some parameters of listener/connection through context to avoid passing too many parameters.
func (svr *Service) HandleListener(l net.Listener, internal bool) {
// Listen for incoming connections from client.
for {
@ -473,19 +476,21 @@ func (svr *Service) HandleListener(l net.Listener, internal bool) {
xl := xlog.New()
ctx := context.Background()
c = utilnet.NewContextConn(xlog.NewContext(ctx, xl), c)
c = netpkg.NewContextConn(xlog.NewContext(ctx, xl), c)
if !internal {
log.Trace("start check TLS connection...")
originConn := c
forceTLS := svr.cfg.Transport.TLS.Force && !internal
forceTLS := svr.cfg.Transport.TLS.Force
var isTLS, custom bool
c, isTLS, custom, err = utilnet.CheckAndEnableTLSServerConnWithTimeout(c, svr.tlsConfig, forceTLS, connReadTimeout)
c, isTLS, custom, err = netpkg.CheckAndEnableTLSServerConnWithTimeout(c, svr.tlsConfig, forceTLS, connReadTimeout)
if err != nil {
log.Warn("CheckAndEnableTLSServerConnWithTimeout error: %v", err)
originConn.Close()
continue
}
log.Trace("check TLS connection success, isTLS: %v custom: %v", isTLS, custom)
log.Trace("check TLS connection success, isTLS: %v custom: %v internal: %v", isTLS, custom, internal)
}
// Start a new goroutine to handle connection.
go func(ctx context.Context, frpConn net.Conn) {
@ -508,10 +513,10 @@ func (svr *Service) HandleListener(l net.Listener, internal bool) {
session.Close()
return
}
go svr.handleConnection(ctx, stream)
go svr.handleConnection(ctx, stream, internal)
}
} else {
svr.handleConnection(ctx, frpConn)
svr.handleConnection(ctx, frpConn, internal)
}
}(ctx, c)
}
@ -534,13 +539,13 @@ func (svr *Service) HandleQUICListener(l *quic.Listener) {
_ = frpConn.CloseWithError(0, "")
return
}
go svr.handleConnection(ctx, utilnet.QuicStreamToNetConn(stream, frpConn))
go svr.handleConnection(ctx, netpkg.QuicStreamToNetConn(stream, frpConn), false)
}
}(context.Background(), c)
}
}
func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) error {
func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login, internal bool) error {
// If client's RunID is empty, it's a new client, we just create a new controller.
// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
var err error
@ -551,7 +556,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) error
}
}
ctx := utilnet.NewContextFromConn(ctlConn)
ctx := netpkg.NewContextFromConn(ctlConn)
xl := xlog.FromContextSafe(ctx)
xl.AppendPrefix(loginMsg.RunID)
ctx = xlog.NewContext(ctx, xl)
@ -559,11 +564,16 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) error
ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch)
// Check auth.
if err := svr.authVerifier.VerifyLogin(loginMsg); err != nil {
authVerifier := svr.authVerifier
if internal && loginMsg.ClientSpec.AlwaysAuthPass {
authVerifier = auth.AlwaysPassVerifier
}
if err := authVerifier.VerifyLogin(loginMsg); err != nil {
return err
}
ctl, err := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, svr.authVerifier, ctlConn, loginMsg, svr.cfg)
// TODO(fatedier): use SessionContext
ctl, err := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, authVerifier, ctlConn, !internal, loginMsg, svr.cfg)
if err != nil {
xl.Warn("create new controller error: %v", err)
// don't return detailed errors to client
@ -588,7 +598,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) error
// RegisterWorkConn register a new work connection to control and proxies need it.
func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) error {
xl := utilnet.NewLogFromConn(workConn)
xl := netpkg.NewLogFromConn(workConn)
ctl, exist := svr.ctlManager.GetByID(newMsg.RunID)
if !exist {
xl.Warn("No client control found for run id [%s]", newMsg.RunID)
@ -607,7 +617,7 @@ func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn)
if err == nil {
newMsg = &retContent.NewWorkConn
// Check auth.
err = svr.authVerifier.VerifyNewWorkConn(newMsg)
err = ctl.authVerifier.VerifyNewWorkConn(newMsg)
}
if err != nil {
xl.Warn("invalid NewWorkConn with run id [%s]", newMsg.RunID)

View File

@ -23,12 +23,12 @@ import (
libio "github.com/fatedier/golib/io"
"github.com/samber/lo"
utilnet "github.com/fatedier/frp/pkg/util/net"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
)
type listenerBundle struct {
l *utilnet.InternalListener
l *netpkg.InternalListener
sk string
allowUsers []string
}
@ -46,22 +46,21 @@ func NewManager() *Manager {
}
}
func (vm *Manager) Listen(name string, sk string, allowUsers []string) (l *utilnet.InternalListener, err error) {
func (vm *Manager) Listen(name string, sk string, allowUsers []string) (*netpkg.InternalListener, error) {
vm.mu.Lock()
defer vm.mu.Unlock()
if _, ok := vm.listeners[name]; ok {
err = fmt.Errorf("custom listener for [%s] is repeated", name)
return
return nil, fmt.Errorf("custom listener for [%s] is repeated", name)
}
l = utilnet.NewInternalListener()
l := netpkg.NewInternalListener()
vm.listeners[name] = &listenerBundle{
l: l,
sk: sk,
allowUsers: allowUsers,
}
return
return l, nil
}
func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey string,
@ -91,7 +90,7 @@ func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey
if useCompression {
rwc = libio.WithCompression(rwc)
}
err = l.l.PutConn(utilnet.WrapReadWriteCloserToConn(rwc, conn))
err = l.l.PutConn(netpkg.WrapReadWriteCloserToConn(rwc, conn))
} else {
err = fmt.Errorf("custom listener for [%s] doesn't exist", name)
return

View File

@ -8,7 +8,7 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/fatedier/frp/pkg/util/util"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/test/e2e/framework"
"github.com/fatedier/frp/test/e2e/framework/consts"
"github.com/fatedier/frp/test/e2e/mock/server/streamserver"
@ -176,7 +176,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
connectRequestHost = req.Host
// return ok response
res := util.OkResponse()
res := httppkg.OkResponse()
if res.Body != nil {
defer res.Body.Close()
}

View File

@ -14,7 +14,7 @@ import (
libdial "github.com/fatedier/golib/net/dial"
"github.com/fatedier/frp/pkg/util/util"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/test/e2e/pkg/rpc"
)
@ -115,7 +115,7 @@ func (r *Request) HTTPHeaders(headers map[string]string) *Request {
}
func (r *Request) HTTPAuth(user, password string) *Request {
r.authValue = util.BasicAuth(user, password)
r.authValue = httppkg.BasicAuth(user, password)
return r
}

View File

@ -8,7 +8,7 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/fatedier/frp/pkg/util/util"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/test/e2e/framework"
"github.com/fatedier/frp/test/e2e/framework/consts"
"github.com/fatedier/frp/test/e2e/mock/server/streamserver"
@ -180,7 +180,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
connectRequestHost = req.Host
// return ok response
res := util.OkResponse()
res := httppkg.OkResponse()
if res.Body != nil {
defer res.Body.Close()
}