frp/server/control.go

567 lines
14 KiB
Go
Raw Permalink Normal View History

2017-03-23 02:01:25 +08:00
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-03-09 02:03:47 +08:00
package server
import (
2019-10-12 20:13:12 +08:00
"context"
2017-03-09 02:03:47 +08:00
"fmt"
2019-10-12 20:13:12 +08:00
"net"
2018-05-11 10:42:57 +08:00
"runtime/debug"
2017-03-09 02:03:47 +08:00
"sync"
"sync/atomic"
2017-03-09 02:03:47 +08:00
"time"
"github.com/samber/lo"
2022-08-29 01:02:53 +08:00
2020-09-23 13:49:14 +08:00
"github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
2023-05-29 14:10:34 +08:00
pkgerr "github.com/fatedier/frp/pkg/errors"
2020-09-23 13:49:14 +08:00
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/transport"
2023-11-27 15:47:49 +08:00
netpkg "github.com/fatedier/frp/pkg/util/net"
2020-09-23 13:49:14 +08:00
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/wait"
2020-09-23 13:49:14 +08:00
"github.com/fatedier/frp/pkg/util/xlog"
2019-01-15 00:11:08 +08:00
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/metrics"
2019-01-15 00:11:08 +08:00
"github.com/fatedier/frp/server/proxy"
2017-03-09 02:03:47 +08:00
)
2019-01-15 00:11:08 +08:00
type ControlManager struct {
// controls indexed by run id
2020-05-24 17:48:37 +08:00
ctlsByRunID map[string]*Control
2019-01-15 00:11:08 +08:00
mu sync.RWMutex
}
func NewControlManager() *ControlManager {
return &ControlManager{
2020-05-24 17:48:37 +08:00
ctlsByRunID: make(map[string]*Control),
2019-01-15 00:11:08 +08:00
}
}
func (cm *ControlManager) Add(runID string, ctl *Control) (old *Control) {
2019-01-15 00:11:08 +08:00
cm.mu.Lock()
defer cm.mu.Unlock()
var ok bool
old, ok = cm.ctlsByRunID[runID]
2019-01-15 00:11:08 +08:00
if ok {
old.Replaced(ctl)
2019-01-15 00:11:08 +08:00
}
2020-05-24 17:48:37 +08:00
cm.ctlsByRunID[runID] = ctl
2019-01-15 00:11:08 +08:00
return
}
2019-01-26 21:36:24 +08:00
// we should make sure if it's the same control to prevent delete a new one
2020-05-24 17:48:37 +08:00
func (cm *ControlManager) Del(runID string, ctl *Control) {
2019-01-15 00:11:08 +08:00
cm.mu.Lock()
defer cm.mu.Unlock()
2020-05-24 17:48:37 +08:00
if c, ok := cm.ctlsByRunID[runID]; ok && c == ctl {
delete(cm.ctlsByRunID, runID)
2019-01-26 21:36:24 +08:00
}
2019-01-15 00:11:08 +08:00
}
2020-05-24 17:48:37 +08:00
func (cm *ControlManager) GetByID(runID string) (ctl *Control, ok bool) {
2019-01-15 00:11:08 +08:00
cm.mu.RLock()
defer cm.mu.RUnlock()
2020-05-24 17:48:37 +08:00
ctl, ok = cm.ctlsByRunID[runID]
2019-01-15 00:11:08 +08:00
return
}
func (cm *ControlManager) Close() error {
cm.mu.Lock()
defer cm.mu.Unlock()
for _, ctl := range cm.ctlsByRunID {
ctl.Close()
}
cm.ctlsByRunID = make(map[string]*Control)
return nil
}
2017-03-09 02:03:47 +08:00
type Control struct {
2019-01-10 20:53:06 +08:00
// all resource managers and controllers
2019-01-15 00:11:08 +08:00
rc *controller.ResourceController
// proxy manager
2020-05-24 17:48:37 +08:00
pxyManager *proxy.Manager
2019-01-15 00:11:08 +08:00
2019-12-20 20:28:28 +08:00
// plugin manager
pluginManager *plugin.Manager
// verifies authentication based on selected method
authVerifier auth.Verifier
// other components can use this to communicate with client
msgTransporter transport.MessageTransporter
// msgDispatcher is a wrapper for control connection.
// It provides a channel for sending messages, and you can register handlers to process messages based on their respective types.
msgDispatcher *msg.Dispatcher
2017-03-09 02:03:47 +08:00
// login message
loginMsg *msg.Login
// control connection
conn net.Conn
// work connections
workConnCh chan net.Conn
// proxies in one client
2019-01-15 00:11:08 +08:00
proxies map[string]proxy.Proxy
2017-03-09 02:03:47 +08:00
// pool count
poolCount int
2018-01-26 14:56:55 +08:00
// ports used, for limitations
portsUsedNum int
2017-03-09 02:03:47 +08:00
// last time got the Ping message
lastPing atomic.Value
2017-03-09 02:03:47 +08:00
// A new run id will be generated when a new client login.
// If run id got from login message has same run id, it means it's the same client, so we can
// replace old controller instantly.
2020-05-24 17:48:37 +08:00
runID string
2017-03-09 02:03:47 +08:00
mu sync.RWMutex
// Server configuration information
serverCfg *v1.ServerConfig
2019-10-12 20:13:12 +08:00
xl *xlog.Logger
ctx context.Context
doneCh chan struct{}
2017-03-09 02:03:47 +08:00
}
2023-11-27 15:47:49 +08:00
// TODO(fatedier): Referencing the implementation of frpc, encapsulate the input parameters as SessionContext.
2019-12-20 20:28:28 +08:00
func NewControl(
ctx context.Context,
rc *controller.ResourceController,
2020-05-24 17:48:37 +08:00
pxyManager *proxy.Manager,
2019-12-20 20:28:28 +08:00
pluginManager *plugin.Manager,
authVerifier auth.Verifier,
2019-12-20 20:28:28 +08:00
ctlConn net.Conn,
2023-11-27 15:47:49 +08:00
ctlConnEncrypted bool,
2019-12-20 20:28:28 +08:00
loginMsg *msg.Login,
serverCfg *v1.ServerConfig,
) (*Control, error) {
2019-08-29 21:13:21 +08:00
poolCount := loginMsg.PoolCount
if poolCount > int(serverCfg.Transport.MaxPoolCount) {
poolCount = int(serverCfg.Transport.MaxPoolCount)
2019-08-29 21:13:21 +08:00
}
ctl := &Control{
rc: rc,
pxyManager: pxyManager,
pluginManager: pluginManager,
authVerifier: authVerifier,
conn: ctlConn,
loginMsg: loginMsg,
workConnCh: make(chan net.Conn, poolCount+10),
proxies: make(map[string]proxy.Proxy),
poolCount: poolCount,
portsUsedNum: 0,
runID: loginMsg.RunID,
serverCfg: serverCfg,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
doneCh: make(chan struct{}),
2017-03-09 02:03:47 +08:00
}
ctl.lastPing.Store(time.Now())
2023-11-27 15:47:49 +08:00
if ctlConnEncrypted {
cryptoRW, err := netpkg.NewCryptoReadWriter(ctl.conn, []byte(ctl.serverCfg.Auth.Token))
if err != nil {
return nil, err
}
ctl.msgDispatcher = msg.NewDispatcher(cryptoRW)
} else {
ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
}
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
return ctl, nil
2017-03-09 02:03:47 +08:00
}
// Start send a login success message to client and start working.
func (ctl *Control) Start() {
2017-03-10 01:42:06 +08:00
loginRespMsg := &msg.LoginResp{
2023-05-29 01:09:53 +08:00
Version: version.Full(),
RunID: ctl.runID,
Error: "",
2017-03-09 02:03:47 +08:00
}
2022-08-29 01:02:53 +08:00
_ = msg.WriteMsg(ctl.conn, loginRespMsg)
2017-03-09 02:03:47 +08:00
go func() {
for i := 0; i < ctl.poolCount; i++ {
// ignore error here, that means that this control is closed
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
}
}()
go ctl.worker()
2017-03-09 02:03:47 +08:00
}
func (ctl *Control) Close() error {
ctl.conn.Close()
return nil
}
func (ctl *Control) Replaced(newCtl *Control) {
xl := ctl.xl
xl.Info("Replaced by client [%s]", newCtl.runID)
ctl.runID = ""
ctl.conn.Close()
}
func (ctl *Control) RegisterWorkConn(conn net.Conn) error {
2019-10-12 20:13:12 +08:00
xl := ctl.xl
2017-03-09 02:03:47 +08:00
defer func() {
if err := recover(); err != nil {
2019-10-12 20:13:12 +08:00
xl.Error("panic error: %v", err)
xl.Error(string(debug.Stack()))
2017-03-09 02:03:47 +08:00
}
}()
select {
case ctl.workConnCh <- conn:
2019-10-12 20:13:12 +08:00
xl.Debug("new work connection registered")
return nil
2017-03-09 02:03:47 +08:00
default:
2019-10-12 20:13:12 +08:00
xl.Debug("work connection pool is full, discarding")
return fmt.Errorf("work connection pool is full, discarding")
2017-03-09 02:03:47 +08:00
}
}
// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.
// return an error if wait timeout
func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
2019-10-12 20:13:12 +08:00
xl := ctl.xl
2017-03-09 02:03:47 +08:00
defer func() {
if err := recover(); err != nil {
2019-10-12 20:13:12 +08:00
xl.Error("panic error: %v", err)
xl.Error(string(debug.Stack()))
2017-03-09 02:03:47 +08:00
}
}()
var ok bool
// get a work connection from the pool
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
2023-05-29 14:10:34 +08:00
err = pkgerr.ErrCtlClosed
2017-03-09 02:03:47 +08:00
return
}
2019-10-12 20:13:12 +08:00
xl.Debug("get work connection from pool")
2017-03-09 02:03:47 +08:00
default:
// no work connections available in the poll, send message to frpc to get more
if err := ctl.msgDispatcher.Send(&msg.ReqWorkConn{}); err != nil {
return nil, fmt.Errorf("control is already closed")
2017-03-09 02:03:47 +08:00
}
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
2023-05-29 14:10:34 +08:00
err = pkgerr.ErrCtlClosed
xl.Warn("no work connections available, %v", err)
2017-03-09 02:03:47 +08:00
return
}
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
2017-03-09 02:03:47 +08:00
err = fmt.Errorf("timeout trying to get work connection")
2019-10-12 20:13:12 +08:00
xl.Warn("%v", err)
2017-03-09 02:03:47 +08:00
return
}
}
// When we get a work connection from pool, replace it with a new one.
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
2017-03-09 02:03:47 +08:00
return
}
func (ctl *Control) heartbeatWorker() {
2019-10-12 20:13:12 +08:00
xl := ctl.xl
2017-03-09 02:03:47 +08:00
// Don't need application heartbeat if TCPMux is enabled,
// yamux will do same thing.
// TODO(fatedier): let default HeartbeatTimeout to -1 if TCPMux is enabled. Users can still set it to positive value to enable it.
if !lo.FromPtr(ctl.serverCfg.Transport.TCPMux) && ctl.serverCfg.Transport.HeartbeatTimeout > 0 {
go wait.Until(func() {
if time.Since(ctl.lastPing.Load().(time.Time)) > time.Duration(ctl.serverCfg.Transport.HeartbeatTimeout)*time.Second {
xl.Warn("heartbeat timeout")
2023-11-27 15:47:49 +08:00
ctl.conn.Close()
2017-03-09 02:03:47 +08:00
return
}
}, time.Second, ctl.doneCh)
2017-03-09 02:03:47 +08:00
}
}
// block until Control closed
func (ctl *Control) WaitClosed() {
<-ctl.doneCh
}
func (ctl *Control) worker() {
2019-10-12 20:13:12 +08:00
xl := ctl.xl
2017-03-09 02:03:47 +08:00
go ctl.heartbeatWorker()
go ctl.msgDispatcher.Run()
2017-03-09 02:03:47 +08:00
<-ctl.msgDispatcher.Done()
ctl.conn.Close()
2017-03-09 02:03:47 +08:00
2018-03-19 20:22:15 +08:00
ctl.mu.Lock()
defer ctl.mu.Unlock()
2017-03-09 02:03:47 +08:00
close(ctl.workConnCh)
for workConn := range ctl.workConnCh {
workConn.Close()
}
for _, pxy := range ctl.proxies {
pxy.Close()
2019-01-15 00:11:08 +08:00
ctl.pxyManager.Del(pxy.GetName())
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
notifyContent := &plugin.CloseProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
CloseProxy: msg.CloseProxy{
ProxyName: pxy.GetName(),
},
}
go func() {
2022-08-29 01:02:53 +08:00
_ = ctl.pluginManager.CloseProxy(notifyContent)
}()
2017-03-09 02:03:47 +08:00
}
metrics.Server.CloseClient()
xl.Info("client exit success")
close(ctl.doneCh)
2019-01-15 00:11:08 +08:00
}
func (ctl *Control) registerMsgHandlers() {
ctl.msgDispatcher.RegisterHandler(&msg.NewProxy{}, ctl.handleNewProxy)
ctl.msgDispatcher.RegisterHandler(&msg.Ping{}, ctl.handlePing)
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleVisitor{}, msg.AsyncHandler(ctl.handleNatHoleVisitor))
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleClient{}, msg.AsyncHandler(ctl.handleNatHoleClient))
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleReport{}, msg.AsyncHandler(ctl.handleNatHoleReport))
ctl.msgDispatcher.RegisterHandler(&msg.CloseProxy{}, ctl.handleCloseProxy)
2017-03-09 02:03:47 +08:00
}
func (ctl *Control) handleNewProxy(m msg.Message) {
2019-10-12 20:13:12 +08:00
xl := ctl.xl
inMsg := m.(*msg.NewProxy)
content := &plugin.NewProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
NewProxy: *inMsg,
}
var remoteAddr string
retContent, err := ctl.pluginManager.NewProxy(content)
if err == nil {
inMsg = &retContent.NewProxy
remoteAddr, err = ctl.RegisterProxy(inMsg)
}
2017-03-09 02:03:47 +08:00
// register proxy in this control
resp := &msg.NewProxyResp{
ProxyName: inMsg.ProxyName,
}
if err != nil {
xl.Warn("new proxy [%s] type [%s] error: %v", inMsg.ProxyName, inMsg.ProxyType, err)
resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", inMsg.ProxyName),
err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient))
} else {
resp.RemoteAddr = remoteAddr
xl.Info("new proxy [%s] type [%s] success", inMsg.ProxyName, inMsg.ProxyType)
metrics.Server.NewProxy(inMsg.ProxyName, inMsg.ProxyType)
}
_ = ctl.msgDispatcher.Send(resp)
}
2017-03-09 02:03:47 +08:00
func (ctl *Control) handlePing(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.Ping)
content := &plugin.PingContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
Ping: *inMsg,
}
retContent, err := ctl.pluginManager.Ping(content)
if err == nil {
inMsg = &retContent.Ping
err = ctl.authVerifier.VerifyPing(inMsg)
2017-03-09 02:03:47 +08:00
}
if err != nil {
xl.Warn("received invalid ping: %v", err)
_ = ctl.msgDispatcher.Send(&msg.Pong{
Error: util.GenerateResponseErrorString("invalid ping", err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient)),
})
return
}
ctl.lastPing.Store(time.Now())
xl.Debug("receive heartbeat")
_ = ctl.msgDispatcher.Send(&msg.Pong{})
2017-03-09 02:03:47 +08:00
}
func (ctl *Control) handleNatHoleVisitor(m msg.Message) {
inMsg := m.(*msg.NatHoleVisitor)
ctl.rc.NatHoleController.HandleVisitor(inMsg, ctl.msgTransporter, ctl.loginMsg.User)
}
func (ctl *Control) handleNatHoleClient(m msg.Message) {
inMsg := m.(*msg.NatHoleClient)
ctl.rc.NatHoleController.HandleClient(inMsg, ctl.msgTransporter)
}
func (ctl *Control) handleNatHoleReport(m msg.Message) {
inMsg := m.(*msg.NatHoleReport)
ctl.rc.NatHoleController.HandleReport(inMsg)
}
func (ctl *Control) handleCloseProxy(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.CloseProxy)
_ = ctl.CloseProxy(inMsg)
xl.Info("close proxy [%s] success", inMsg.ProxyName)
}
2018-01-17 14:40:08 +08:00
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
var pxyConf v1.ProxyConfigurer
// Load configures from NewProxy message and validate.
pxyConf, err = config.NewProxyConfigurerFromMsg(pxyMsg, ctl.serverCfg)
2017-03-09 02:03:47 +08:00
if err != nil {
2018-01-17 14:40:08 +08:00
return
2017-03-09 02:03:47 +08:00
}
// User info
userInfo := plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
2020-05-24 17:48:37 +08:00
RunID: ctl.runID,
}
// NewProxy will return an interface Proxy.
// In fact, it creates different proxies based on the proxy type. We just call run() here.
2023-09-20 15:18:50 +08:00
pxy, err := proxy.NewProxy(ctl.ctx, &proxy.Options{
UserInfo: userInfo,
LoginMsg: ctl.loginMsg,
PoolCount: ctl.poolCount,
ResourceController: ctl.rc,
GetWorkConnFn: ctl.GetWorkConn,
Configurer: pxyConf,
ServerCfg: ctl.serverCfg,
})
2017-03-09 02:03:47 +08:00
if err != nil {
2018-01-17 14:40:08 +08:00
return remoteAddr, err
2017-03-09 02:03:47 +08:00
}
2018-01-26 14:56:55 +08:00
// Check ports used number in each client
if ctl.serverCfg.MaxPortsPerClient > 0 {
2018-01-26 14:56:55 +08:00
ctl.mu.Lock()
if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(ctl.serverCfg.MaxPortsPerClient) {
2018-01-26 14:56:55 +08:00
ctl.mu.Unlock()
err = fmt.Errorf("exceed the max_ports_per_client")
return
}
2022-08-29 01:02:53 +08:00
ctl.portsUsedNum += pxy.GetUsedPortsNum()
2018-01-26 14:56:55 +08:00
ctl.mu.Unlock()
defer func() {
if err != nil {
ctl.mu.Lock()
2022-08-29 01:02:53 +08:00
ctl.portsUsedNum -= pxy.GetUsedPortsNum()
2018-01-26 14:56:55 +08:00
ctl.mu.Unlock()
}
}()
}
if ctl.pxyManager.Exist(pxyMsg.ProxyName) {
err = fmt.Errorf("proxy [%s] already exists", pxyMsg.ProxyName)
return
}
2018-01-17 14:40:08 +08:00
remoteAddr, err = pxy.Run()
2017-03-09 02:03:47 +08:00
if err != nil {
2018-01-17 14:40:08 +08:00
return
2017-03-09 02:03:47 +08:00
}
defer func() {
if err != nil {
pxy.Close()
}
}()
2019-01-15 00:11:08 +08:00
err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy)
2017-03-09 02:03:47 +08:00
if err != nil {
2018-01-17 14:40:08 +08:00
return
2017-03-09 02:03:47 +08:00
}
2017-06-11 17:22:05 +08:00
ctl.mu.Lock()
ctl.proxies[pxy.GetName()] = pxy
ctl.mu.Unlock()
2018-01-17 14:40:08 +08:00
return
2017-03-09 02:03:47 +08:00
}
2017-06-11 17:22:05 +08:00
func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.mu.Lock()
pxy, ok := ctl.proxies[closeMsg.ProxyName]
if !ok {
2018-01-26 14:56:55 +08:00
ctl.mu.Unlock()
2017-06-11 17:22:05 +08:00
return
}
if ctl.serverCfg.MaxPortsPerClient > 0 {
2022-08-29 01:02:53 +08:00
ctl.portsUsedNum -= pxy.GetUsedPortsNum()
2018-01-26 14:56:55 +08:00
}
2017-06-11 17:22:05 +08:00
pxy.Close()
2019-01-15 00:11:08 +08:00
ctl.pxyManager.Del(pxy.GetName())
2017-06-27 01:59:30 +08:00
delete(ctl.proxies, closeMsg.ProxyName)
2018-01-26 14:56:55 +08:00
ctl.mu.Unlock()
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
notifyContent := &plugin.CloseProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
CloseProxy: msg.CloseProxy{
ProxyName: pxy.GetName(),
},
}
go func() {
2022-08-29 01:02:53 +08:00
_ = ctl.pluginManager.CloseProxy(notifyContent)
}()
2017-06-11 17:22:05 +08:00
return
}