frps: optimize code

This commit is contained in:
fatedier 2019-01-10 20:53:06 +08:00
parent 7c21906884
commit 0c7d778896
6 changed files with 116 additions and 109 deletions

View File

@ -35,8 +35,8 @@ import (
) )
type Control struct { type Control struct {
// frps service // all resource managers and controllers
svr *Service rc *ResourceController
// login message // login message
loginMsg *msg.Login loginMsg *msg.Login
@ -81,9 +81,9 @@ type Control struct {
mu sync.RWMutex mu sync.RWMutex
} }
func NewControl(svr *Service, ctlConn net.Conn, loginMsg *msg.Login) *Control { func NewControl(rc *ResourceController, ctlConn net.Conn, loginMsg *msg.Login) *Control {
return &Control{ return &Control{
svr: svr, rc: rc,
conn: ctlConn, conn: ctlConn,
loginMsg: loginMsg, loginMsg: loginMsg,
sendCh: make(chan msg.Message, 10), sendCh: make(chan msg.Message, 10),
@ -284,11 +284,12 @@ func (ctl *Control) stoper() {
for _, pxy := range ctl.proxies { for _, pxy := range ctl.proxies {
pxy.Close() pxy.Close()
ctl.svr.DelProxy(pxy.GetName()) ctl.rc.PxyManager.Del(pxy.GetName())
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
} }
ctl.allShutdown.Done() ctl.allShutdown.Done()
ctl.rc.CtlManager.Del(ctl.runId)
ctl.conn.Info("client exit success") ctl.conn.Info("client exit success")
StatsCloseClient() StatsCloseClient()
@ -358,7 +359,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
// NewProxy will return a interface Proxy. // NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here. // In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := NewProxy(ctl, pxyConf) pxy, err := NewProxy(ctl.runId, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf)
if err != nil { if err != nil {
return remoteAddr, err return remoteAddr, err
} }
@ -393,7 +394,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
} }
}() }()
err = ctl.svr.RegisterProxy(pxyMsg.ProxyName, pxy) err = ctl.rc.PxyManager.Add(pxyMsg.ProxyName, pxy)
if err != nil { if err != nil {
return return
} }
@ -406,7 +407,6 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) { func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.mu.Lock() ctl.mu.Lock()
pxy, ok := ctl.proxies[closeMsg.ProxyName] pxy, ok := ctl.proxies[closeMsg.ProxyName]
if !ok { if !ok {
ctl.mu.Unlock() ctl.mu.Unlock()
@ -417,7 +417,7 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum() ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
} }
pxy.Close() pxy.Close()
ctl.svr.DelProxy(pxy.GetName()) ctl.rc.PxyManager.Del(pxy.GetName())
delete(ctl.proxies, closeMsg.ProxyName) delete(ctl.proxies, closeMsg.ProxyName)
ctl.mu.Unlock() ctl.mu.Unlock()

View File

@ -32,7 +32,7 @@ var (
httpServerWriteTimeout = 10 * time.Second httpServerWriteTimeout = 10 * time.Second
) )
func RunDashboardServer(addr string, port int) (err error) { func (svr *Service) RunDashboardServer(addr string, port int) (err error) {
// url router // url router
router := mux.NewRouter() router := mux.NewRouter()
@ -40,10 +40,10 @@ func RunDashboardServer(addr string, port int) (err error) {
router.Use(frpNet.NewHttpAuthMiddleware(user, passwd).Middleware) router.Use(frpNet.NewHttpAuthMiddleware(user, passwd).Middleware)
// api, see dashboard_api.go // api, see dashboard_api.go
router.HandleFunc("/api/serverinfo", apiServerInfo).Methods("GET") router.HandleFunc("/api/serverinfo", svr.ApiServerInfo).Methods("GET")
router.HandleFunc("/api/proxy/{type}", apiProxyByType).Methods("GET") router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET")
router.HandleFunc("/api/proxy/{type}/{name}", apiProxyByTypeAndName).Methods("GET") router.HandleFunc("/api/proxy/{type}/{name}", svr.ApiProxyByTypeAndName).Methods("GET")
router.HandleFunc("/api/traffic/{name}", apiProxyTraffic).Methods("GET") router.HandleFunc("/api/traffic/{name}", svr.ApiProxyTraffic).Methods("GET")
// view // view
router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET") router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET")

View File

@ -32,7 +32,6 @@ type GeneralResponse struct {
Msg string `json:"msg"` Msg string `json:"msg"`
} }
// api/serverinfo
type ServerInfoResp struct { type ServerInfoResp struct {
GeneralResponse GeneralResponse
@ -55,7 +54,8 @@ type ServerInfoResp struct {
ProxyTypeCounts map[string]int64 `json:"proxy_type_count"` ProxyTypeCounts map[string]int64 `json:"proxy_type_count"`
} }
func apiServerInfo(w http.ResponseWriter, r *http.Request) { // api/serverinfo
func (svr *Service) ApiServerInfo(w http.ResponseWriter, r *http.Request) {
var ( var (
buf []byte buf []byte
res ServerInfoResp res ServerInfoResp
@ -162,7 +162,7 @@ type GetProxyInfoResp struct {
} }
// api/proxy/:type // api/proxy/:type
func apiProxyByType(w http.ResponseWriter, r *http.Request) { func (svr *Service) ApiProxyByType(w http.ResponseWriter, r *http.Request) {
var ( var (
buf []byte buf []byte
res GetProxyInfoResp res GetProxyInfoResp
@ -177,19 +177,19 @@ func apiProxyByType(w http.ResponseWriter, r *http.Request) {
}() }()
log.Info("Http request: [%s]", r.URL.Path) log.Info("Http request: [%s]", r.URL.Path)
res.Proxies = getProxyStatsByType(proxyType) res.Proxies = svr.getProxyStatsByType(proxyType)
buf, _ = json.Marshal(&res) buf, _ = json.Marshal(&res)
w.Write(buf) w.Write(buf)
} }
func getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) { func (svr *Service) getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) {
proxyStats := StatsGetProxiesByType(proxyType) proxyStats := StatsGetProxiesByType(proxyType)
proxyInfos = make([]*ProxyStatsInfo, 0, len(proxyStats)) proxyInfos = make([]*ProxyStatsInfo, 0, len(proxyStats))
for _, ps := range proxyStats { for _, ps := range proxyStats {
proxyInfo := &ProxyStatsInfo{} proxyInfo := &ProxyStatsInfo{}
if pxy, ok := ServerService.pxyManager.GetByName(ps.Name); ok { if pxy, ok := svr.rc.PxyManager.GetByName(ps.Name); ok {
content, err := json.Marshal(pxy.GetConf()) content, err := json.Marshal(pxy.GetConf())
if err != nil { if err != nil {
log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err) log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err)
@ -230,7 +230,7 @@ type GetProxyStatsResp struct {
} }
// api/proxy/:type/:name // api/proxy/:type/:name
func apiProxyByTypeAndName(w http.ResponseWriter, r *http.Request) { func (svr *Service) ApiProxyByTypeAndName(w http.ResponseWriter, r *http.Request) {
var ( var (
buf []byte buf []byte
res GetProxyStatsResp res GetProxyStatsResp
@ -244,20 +244,20 @@ func apiProxyByTypeAndName(w http.ResponseWriter, r *http.Request) {
}() }()
log.Info("Http request: [%s]", r.URL.Path) log.Info("Http request: [%s]", r.URL.Path)
res = getProxyStatsByTypeAndName(proxyType, name) res = svr.getProxyStatsByTypeAndName(proxyType, name)
buf, _ = json.Marshal(&res) buf, _ = json.Marshal(&res)
w.Write(buf) w.Write(buf)
} }
func getProxyStatsByTypeAndName(proxyType string, proxyName string) (proxyInfo GetProxyStatsResp) { func (svr *Service) getProxyStatsByTypeAndName(proxyType string, proxyName string) (proxyInfo GetProxyStatsResp) {
proxyInfo.Name = proxyName proxyInfo.Name = proxyName
ps := StatsGetProxiesByTypeAndName(proxyType, proxyName) ps := StatsGetProxiesByTypeAndName(proxyType, proxyName)
if ps == nil { if ps == nil {
proxyInfo.Code = 1 proxyInfo.Code = 1
proxyInfo.Msg = "no proxy info found" proxyInfo.Msg = "no proxy info found"
} else { } else {
if pxy, ok := ServerService.pxyManager.GetByName(proxyName); ok { if pxy, ok := svr.rc.PxyManager.GetByName(proxyName); ok {
content, err := json.Marshal(pxy.GetConf()) content, err := json.Marshal(pxy.GetConf())
if err != nil { if err != nil {
log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err) log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err)
@ -295,7 +295,7 @@ type GetProxyTrafficResp struct {
TrafficOut []int64 `json:"traffic_out"` TrafficOut []int64 `json:"traffic_out"`
} }
func apiProxyTraffic(w http.ResponseWriter, r *http.Request) { func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) {
var ( var (
buf []byte buf []byte
res GetProxyTrafficResp res GetProxyTrafficResp

View File

@ -50,6 +50,12 @@ func (cm *ControlManager) Add(runId string, ctl *Control) (oldCtl *Control) {
return return
} }
func (cm *ControlManager) Del(runId string) {
cm.mu.Lock()
defer cm.mu.Unlock()
delete(cm.ctlsByRunId, runId)
}
func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) { func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) {
cm.mu.RLock() cm.mu.RLock()
defer cm.mu.RUnlock() defer cm.mu.RUnlock()

View File

@ -36,9 +36,10 @@ import (
frpIo "github.com/fatedier/golib/io" frpIo "github.com/fatedier/golib/io"
) )
type GetWorkConnFn func() (frpNet.Conn, error)
type Proxy interface { type Proxy interface {
Run() (remoteAddr string, err error) Run() (remoteAddr string, err error)
GetControl() *Control
GetName() string GetName() string
GetConf() config.ProxyConf GetConf() config.ProxyConf
GetWorkConnFromPool() (workConn frpNet.Conn, err error) GetWorkConnFromPool() (workConn frpNet.Conn, err error)
@ -48,10 +49,12 @@ type Proxy interface {
} }
type BaseProxy struct { type BaseProxy struct {
name string name string
ctl *Control rc *ResourceController
listeners []frpNet.Listener listeners []frpNet.Listener
usedPortsNum int usedPortsNum int
poolCount int
getWorkConnFn GetWorkConnFn
mu sync.RWMutex mu sync.RWMutex
log.Logger log.Logger
@ -61,10 +64,6 @@ func (pxy *BaseProxy) GetName() string {
return pxy.name return pxy.name
} }
func (pxy *BaseProxy) GetControl() *Control {
return pxy.ctl
}
func (pxy *BaseProxy) GetUsedPortsNum() int { func (pxy *BaseProxy) GetUsedPortsNum() int {
return pxy.usedPortsNum return pxy.usedPortsNum
} }
@ -77,10 +76,9 @@ func (pxy *BaseProxy) Close() {
} }
func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) { func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
ctl := pxy.GetControl()
// try all connections from the pool // try all connections from the pool
for i := 0; i < ctl.poolCount+1; i++ { for i := 0; i < pxy.poolCount+1; i++ {
if workConn, err = ctl.GetWorkConn(); err != nil { if workConn, err = pxy.getWorkConnFn(); err != nil {
pxy.Warn("failed to get work connection: %v", err) pxy.Warn("failed to get work connection: %v", err)
return return
} }
@ -126,12 +124,14 @@ func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Con
} }
} }
func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) { func NewProxy(runId string, rc *ResourceController, poolCount int, getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf) (pxy Proxy, err error) {
basePxy := BaseProxy{ basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName, name: pxyConf.GetBaseInfo().ProxyName,
ctl: ctl, rc: rc,
listeners: make([]frpNet.Listener, 0), listeners: make([]frpNet.Listener, 0),
Logger: log.NewPrefixLogger(ctl.runId), poolCount: poolCount,
getWorkConnFn: getWorkConnFn,
Logger: log.NewPrefixLogger(runId),
} }
switch cfg := pxyConf.(type) { switch cfg := pxyConf.(type) {
case *config.TcpProxyConf: case *config.TcpProxyConf:
@ -182,7 +182,7 @@ type TcpProxy struct {
func (pxy *TcpProxy) Run() (remoteAddr string, err error) { func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
if pxy.cfg.Group != "" { if pxy.cfg.Group != "" {
l, realPort, errRet := pxy.ctl.svr.tcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort) l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort)
if errRet != nil { if errRet != nil {
err = errRet err = errRet
return return
@ -198,13 +198,13 @@ func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
pxy.listeners = append(pxy.listeners, listener) pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group) pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group)
} else { } else {
pxy.realPort, err = pxy.ctl.svr.tcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) pxy.realPort, err = pxy.rc.TcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil { if err != nil {
return return
} }
defer func() { defer func() {
if err != nil { if err != nil {
pxy.ctl.svr.tcpPortManager.Release(pxy.realPort) pxy.rc.TcpPortManager.Release(pxy.realPort)
} }
}() }()
listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort) listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort)
@ -230,7 +230,7 @@ func (pxy *TcpProxy) GetConf() config.ProxyConf {
func (pxy *TcpProxy) Close() { func (pxy *TcpProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
if pxy.cfg.Group == "" { if pxy.cfg.Group == "" {
pxy.ctl.svr.tcpPortManager.Release(pxy.realPort) pxy.rc.TcpPortManager.Release(pxy.realPort)
} }
} }
@ -260,7 +260,7 @@ func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
routeConfig.Domain = domain routeConfig.Domain = domain
for _, location := range locations { for _, location := range locations {
routeConfig.Location = location routeConfig.Location = location
err = pxy.ctl.svr.httpReverseProxy.Register(routeConfig) err = pxy.rc.HttpReverseProxy.Register(routeConfig)
if err != nil { if err != nil {
return return
} }
@ -268,7 +268,7 @@ func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
tmpLocation := routeConfig.Location tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(g.GlbServerCfg.VhostHttpPort))) addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(g.GlbServerCfg.VhostHttpPort)))
pxy.closeFuncs = append(pxy.closeFuncs, func() { pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.ctl.svr.httpReverseProxy.UnRegister(tmpDomain, tmpLocation) pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
}) })
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location) pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
} }
@ -278,7 +278,7 @@ func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
for _, location := range locations { for _, location := range locations {
routeConfig.Location = location routeConfig.Location = location
err = pxy.ctl.svr.httpReverseProxy.Register(routeConfig) err = pxy.rc.HttpReverseProxy.Register(routeConfig)
if err != nil { if err != nil {
return return
} }
@ -286,7 +286,7 @@ func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
tmpLocation := routeConfig.Location tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort)) addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort))
pxy.closeFuncs = append(pxy.closeFuncs, func() { pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.ctl.svr.httpReverseProxy.UnRegister(tmpDomain, tmpLocation) pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
}) })
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location) pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
} }
@ -348,7 +348,7 @@ func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
addrs := make([]string, 0) addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains { for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain routeConfig.Domain = domain
l, errRet := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig) l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil { if errRet != nil {
err = errRet err = errRet
return return
@ -361,7 +361,7 @@ func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
if pxy.cfg.SubDomain != "" { if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
l, errRet := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig) l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil { if errRet != nil {
err = errRet err = errRet
return return
@ -391,7 +391,7 @@ type StcpProxy struct {
} }
func (pxy *StcpProxy) Run() (remoteAddr string, err error) { func (pxy *StcpProxy) Run() (remoteAddr string, err error) {
listener, errRet := pxy.ctl.svr.visitorManager.Listen(pxy.GetName(), pxy.cfg.Sk) listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
if errRet != nil { if errRet != nil {
err = errRet err = errRet
return return
@ -410,7 +410,7 @@ func (pxy *StcpProxy) GetConf() config.ProxyConf {
func (pxy *StcpProxy) Close() { func (pxy *StcpProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
pxy.ctl.svr.visitorManager.CloseListener(pxy.GetName()) pxy.rc.VisitorManager.CloseListener(pxy.GetName())
} }
type XtcpProxy struct { type XtcpProxy struct {
@ -421,12 +421,12 @@ type XtcpProxy struct {
} }
func (pxy *XtcpProxy) Run() (remoteAddr string, err error) { func (pxy *XtcpProxy) Run() (remoteAddr string, err error) {
if pxy.ctl.svr.natHoleController == nil { if pxy.rc.NatHoleController == nil {
pxy.Error("udp port for xtcp is not specified.") pxy.Error("udp port for xtcp is not specified.")
err = fmt.Errorf("xtcp is not supported in frps") err = fmt.Errorf("xtcp is not supported in frps")
return return
} }
sidCh := pxy.ctl.svr.natHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk) sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
go func() { go func() {
for { for {
select { select {
@ -456,7 +456,7 @@ func (pxy *XtcpProxy) GetConf() config.ProxyConf {
func (pxy *XtcpProxy) Close() { func (pxy *XtcpProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
pxy.ctl.svr.natHoleController.CloseClient(pxy.GetName()) pxy.rc.NatHoleController.CloseClient(pxy.GetName())
errors.PanicToError(func() { errors.PanicToError(func() {
close(pxy.closeCh) close(pxy.closeCh)
}) })
@ -488,13 +488,13 @@ type UdpProxy struct {
} }
func (pxy *UdpProxy) Run() (remoteAddr string, err error) { func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
pxy.realPort, err = pxy.ctl.svr.udpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) pxy.realPort, err = pxy.rc.UdpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil { if err != nil {
return return
} }
defer func() { defer func() {
if err != nil { if err != nil {
pxy.ctl.svr.udpPortManager.Release(pxy.realPort) pxy.rc.UdpPortManager.Release(pxy.realPort)
} }
}() }()
@ -648,7 +648,7 @@ func (pxy *UdpProxy) Close() {
close(pxy.readCh) close(pxy.readCh)
close(pxy.sendCh) close(pxy.sendCh)
} }
pxy.ctl.svr.udpPortManager.Release(pxy.realPort) pxy.rc.UdpPortManager.Release(pxy.realPort)
} }
// HandleUserTcpConnection is used for incoming tcp user connections. // HandleUserTcpConnection is used for incoming tcp user connections.

View File

@ -43,6 +43,36 @@ const (
var ServerService *Service var ServerService *Service
// All resource managers and controllers
type ResourceController struct {
// Manage all controllers
CtlManager *ControlManager
// Manage all proxies
PxyManager *ProxyManager
// Manage all visitor listeners
VisitorManager *VisitorManager
// Tcp Group Controller
TcpGroupCtl *group.TcpGroupCtl
// Manage all tcp ports
TcpPortManager *ports.PortManager
// Manage all udp ports
UdpPortManager *ports.PortManager
// For http proxies, forwarding http requests
HttpReverseProxy *vhost.HttpReverseProxy
// For https proxies, route requests to different clients by hostname and other infomation
VhostHttpsMuxer *vhost.HttpsMuxer
// Controller for nat hole connections
NatHoleController *NatHoleController
}
// Server service // Server service
type Service struct { type Service struct {
// Dispatch connections to different handlers listen on same port // Dispatch connections to different handlers listen on same port
@ -57,43 +87,22 @@ type Service struct {
// Accept connections using websocket // Accept connections using websocket
websocketListener frpNet.Listener websocketListener frpNet.Listener
// For https proxies, route requests to different clients by hostname and other infomation // All resource managers and controllers
VhostHttpsMuxer *vhost.HttpsMuxer rc *ResourceController
httpReverseProxy *vhost.HttpReverseProxy
// Manage all controllers
ctlManager *ControlManager
// Manage all proxies
pxyManager *ProxyManager
// Manage all visitor listeners
visitorManager *VisitorManager
// Manage all tcp ports
tcpPortManager *ports.PortManager
// Manage all udp ports
udpPortManager *ports.PortManager
// Tcp Group Controller
tcpGroupCtl *group.TcpGroupCtl
// Controller for nat hole connections
natHoleController *NatHoleController
} }
func NewService() (svr *Service, err error) { func NewService() (svr *Service, err error) {
cfg := &g.GlbServerCfg.ServerCommonConf cfg := &g.GlbServerCfg.ServerCommonConf
svr = &Service{ svr = &Service{
ctlManager: NewControlManager(), rc: &ResourceController{
pxyManager: NewProxyManager(), CtlManager: NewControlManager(),
visitorManager: NewVisitorManager(), PxyManager: NewProxyManager(),
tcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts), VisitorManager: NewVisitorManager(),
udpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts), TcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
UdpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
},
} }
svr.tcpGroupCtl = group.NewTcpGroupCtl(svr.tcpPortManager) svr.rc.TcpGroupCtl = group.NewTcpGroupCtl(svr.rc.TcpPortManager)
// Init assets. // Init assets.
err = assets.Load(cfg.AssetsDir) err = assets.Load(cfg.AssetsDir)
@ -151,7 +160,7 @@ func NewService() (svr *Service, err error) {
rp := vhost.NewHttpReverseProxy(vhost.HttpReverseProxyOptions{ rp := vhost.NewHttpReverseProxy(vhost.HttpReverseProxyOptions{
ResponseHeaderTimeoutS: cfg.VhostHttpTimeout, ResponseHeaderTimeoutS: cfg.VhostHttpTimeout,
}) })
svr.httpReverseProxy = rp svr.rc.HttpReverseProxy = rp
address := fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort) address := fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort)
server := &http.Server{ server := &http.Server{
@ -185,7 +194,7 @@ func NewService() (svr *Service, err error) {
} }
} }
svr.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(frpNet.WrapLogListener(l), 30*time.Second) svr.rc.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(frpNet.WrapLogListener(l), 30*time.Second)
if err != nil { if err != nil {
err = fmt.Errorf("Create vhost httpsMuxer error, %v", err) err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
return return
@ -202,13 +211,13 @@ func NewService() (svr *Service, err error) {
err = fmt.Errorf("Create nat hole controller error, %v", err) err = fmt.Errorf("Create nat hole controller error, %v", err)
return return
} }
svr.natHoleController = nc svr.rc.NatHoleController = nc
log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort) log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort)
} }
// Create dashboard web server. // Create dashboard web server.
if cfg.DashboardPort > 0 { if cfg.DashboardPort > 0 {
err = RunDashboardServer(cfg.DashboardAddr, cfg.DashboardPort) err = svr.RunDashboardServer(cfg.DashboardAddr, cfg.DashboardPort)
if err != nil { if err != nil {
err = fmt.Errorf("Create dashboard web server error, %v", err) err = fmt.Errorf("Create dashboard web server error, %v", err)
return return
@ -220,8 +229,8 @@ func NewService() (svr *Service, err error) {
} }
func (svr *Service) Run() { func (svr *Service) Run() {
if svr.natHoleController != nil { if svr.rc.NatHoleController != nil {
go svr.natHoleController.Run() go svr.rc.NatHoleController.Run()
} }
if g.GlbServerCfg.KcpBindPort > 0 { if g.GlbServerCfg.KcpBindPort > 0 {
go svr.HandleListener(svr.kcpListener) go svr.HandleListener(svr.kcpListener)
@ -346,9 +355,9 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
} }
} }
ctl := NewControl(svr, ctlConn, loginMsg) ctl := NewControl(svr.rc, ctlConn, loginMsg)
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil { if oldCtl := svr.rc.CtlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDone() oldCtl.allShutdown.WaitDone()
} }
@ -362,7 +371,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
// RegisterWorkConn register a new work connection to control and proxies need it. // RegisterWorkConn register a new work connection to control and proxies need it.
func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) { func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) {
ctl, exist := svr.ctlManager.GetById(newMsg.RunId) ctl, exist := svr.rc.CtlManager.GetById(newMsg.RunId)
if !exist { if !exist {
workConn.Warn("No client control found for run id [%s]", newMsg.RunId) workConn.Warn("No client control found for run id [%s]", newMsg.RunId)
return return
@ -372,14 +381,6 @@ func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkCo
} }
func (svr *Service) RegisterVisitorConn(visitorConn frpNet.Conn, newMsg *msg.NewVisitorConn) error { func (svr *Service) RegisterVisitorConn(visitorConn frpNet.Conn, newMsg *msg.NewVisitorConn) error {
return svr.visitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey, return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
newMsg.UseEncryption, newMsg.UseCompression) newMsg.UseEncryption, newMsg.UseCompression)
} }
func (svr *Service) RegisterProxy(name string, pxy Proxy) error {
return svr.pxyManager.Add(name, pxy)
}
func (svr *Service) DelProxy(name string) {
svr.pxyManager.Del(name)
}