api: add server web api for statistics

This commit is contained in:
fatedier 2017-03-23 02:01:25 +08:00
parent 9e683fe446
commit a4fece3f51
14 changed files with 738 additions and 426 deletions

View File

@ -1,153 +0,0 @@
// Copyright 2016 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"fmt"
"io"
"net"
"sync"
"time"
"github.com/fatedier/frp/src/models/msg"
"github.com/fatedier/frp/src/utils/conn"
"github.com/fatedier/frp/src/utils/pool"
)
type UdpProcesser struct {
tcpConn *conn.Conn
closeCh chan struct{}
localAddr string
// cache local udp connections
// key is remoteAddr
localUdpConns map[string]*net.UDPConn
mutex sync.RWMutex
tcpConnMutex sync.RWMutex
}
func NewUdpProcesser(c *conn.Conn, localIp string, localPort int64) *UdpProcesser {
return &UdpProcesser{
tcpConn: c,
closeCh: make(chan struct{}),
localAddr: fmt.Sprintf("%s:%d", localIp, localPort),
localUdpConns: make(map[string]*net.UDPConn),
}
}
func (up *UdpProcesser) UpdateTcpConn(c *conn.Conn) {
up.tcpConnMutex.Lock()
defer up.tcpConnMutex.Unlock()
up.tcpConn = c
}
func (up *UdpProcesser) Run() {
go up.ReadLoop()
}
func (up *UdpProcesser) ReadLoop() {
var (
buf string
err error
)
for {
udpPacket := &msg.UdpPacket{}
// read udp package from frps
buf, err = up.tcpConn.ReadLine()
if err != nil {
if err == io.EOF {
return
} else {
continue
}
}
err = udpPacket.UnPack([]byte(buf))
if err != nil {
continue
}
// write to local udp port
sendConn, ok := up.GetUdpConn(udpPacket.SrcStr)
if !ok {
dstAddr, err := net.ResolveUDPAddr("udp", up.localAddr)
if err != nil {
continue
}
sendConn, err = net.DialUDP("udp", nil, dstAddr)
if err != nil {
continue
}
up.SetUdpConn(udpPacket.SrcStr, sendConn)
}
_, err = sendConn.Write(udpPacket.Content)
if err != nil {
sendConn.Close()
continue
}
if !ok {
go up.Forward(udpPacket, sendConn)
}
}
}
func (up *UdpProcesser) Forward(udpPacket *msg.UdpPacket, singleConn *net.UDPConn) {
addr := udpPacket.SrcStr
defer up.RemoveUdpConn(addr)
buf := pool.GetBuf(2048)
for {
singleConn.SetReadDeadline(time.Now().Add(120 * time.Second))
n, remoteAddr, err := singleConn.ReadFromUDP(buf)
if err != nil {
return
}
// forward to frps
forwardPacket := msg.NewUdpPacket(buf[0:n], remoteAddr, udpPacket.Src)
up.tcpConnMutex.RLock()
err = up.tcpConn.WriteString(string(forwardPacket.Pack()) + "\n")
up.tcpConnMutex.RUnlock()
if err != nil {
return
}
}
}
func (up *UdpProcesser) GetUdpConn(addr string) (singleConn *net.UDPConn, ok bool) {
up.mutex.RLock()
defer up.mutex.RUnlock()
singleConn, ok = up.localUdpConns[addr]
return
}
func (up *UdpProcesser) SetUdpConn(addr string, conn *net.UDPConn) {
up.mutex.Lock()
defer up.mutex.Unlock()
up.localUdpConns[addr] = conn
}
func (up *UdpProcesser) RemoveUdpConn(addr string) {
up.mutex.Lock()
defer up.mutex.Unlock()
if c, ok := up.localUdpConns[addr]; ok {
c.Close()
}
delete(up.localUdpConns, addr)
}

View File

@ -113,5 +113,6 @@ func main() {
if config.ServerCommonCfg.PrivilegeMode == true {
log.Info("PrivilegeMode is enabled, you should pay more attention to security issues")
}
server.ServerService = svr
svr.Run()
}

View File

@ -88,11 +88,11 @@ func NewProxyConfFromFile(name string, section ini.Section) (cfg ProxyConf, err
// BaseProxy info
type BaseProxyConf struct {
ProxyName string
ProxyType string
ProxyName string `json:"proxy_name"`
ProxyType string `json:"proxy_type"`
UseEncryption bool
UseCompression bool
UseEncryption bool `json:"use_encryption"`
UseCompression bool `json:"use_compression"`
}
func (cfg *BaseProxyConf) GetName() string {
@ -139,8 +139,8 @@ func (cfg *BaseProxyConf) UnMarshalToMsg(pMsg *msg.NewProxy) {
// Bind info
type BindInfoConf struct {
BindAddr string
RemotePort int64
BindAddr string `json:"bind_addr"`
RemotePort int64 `json:"remote_port"`
}
func (cfg *BindInfoConf) LoadFromMsg(pMsg *msg.NewProxy) {
@ -178,8 +178,8 @@ func (cfg *BindInfoConf) check() (err error) {
// Domain info
type DomainConf struct {
CustomDomains []string
SubDomain string
CustomDomains []string `json:"custom_domains"`
SubDomain string `json:"sub_domain"`
}
func (cfg *DomainConf) LoadFromMsg(pMsg *msg.NewProxy) {
@ -235,8 +235,8 @@ func (cfg *DomainConf) check() (err error) {
}
type LocalSvrConf struct {
LocalIp string
LocalPort int
LocalIp string `json:"-"`
LocalPort int `json:"-"`
}
func (cfg *LocalSvrConf) LoadFromFile(name string, section ini.Section) (err error) {
@ -333,10 +333,10 @@ type HttpProxyConf struct {
LocalSvrConf
Locations []string
HostHeaderRewrite string
HttpUser string
HttpPwd string
Locations []string `json:"locations"`
HostHeaderRewrite string `json:"host_header_rewrite"`
HttpUser string `json:"-"`
HttpPwd string `json:"-"`
}
func (cfg *HttpProxyConf) LoadFromMsg(pMsg *msg.NewProxy) {

View File

@ -15,10 +15,12 @@
package consts
var (
// server status
// proxy status
Idle string = "idle"
Working string = "working"
Closed string = "closed"
Online string = "online"
Offline string = "offline"
// proxy type
TcpProxy string = "tcp"

View File

@ -1,221 +0,0 @@
// Copyright 2016 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
/*
var (
DailyDataKeepDays int = 7
ServerMetricInfoMap map[string]*ServerMetric
smMutex sync.RWMutex
)
type ServerMetric struct {
Name string `json:"name"`
Type string `json:"type"`
BindAddr string `json:"bind_addr"`
ListenPort int64 `json:"listen_port"`
CustomDomains []string `json:"custom_domains"`
Locations []string `json:"locations"`
Status string `json:"status"`
UseEncryption bool `json:"use_encryption"`
UseGzip bool `json:"use_gzip"`
PrivilegeMode bool `json:"privilege_mode"`
// statistics
CurrentConns int64 `json:"current_conns"`
Daily []*DailyServerStats `json:"daily"`
mutex sync.RWMutex
}
type DailyServerStats struct {
Time string `json:"time"`
FlowIn int64 `json:"flow_in"`
FlowOut int64 `json:"flow_out"`
TotalAcceptConns int64 `json:"total_accept_conns"`
}
// for sort
type ServerMetricList []*ServerMetric
func (l ServerMetricList) Len() int { return len(l) }
func (l ServerMetricList) Less(i, j int) bool { return l[i].Name < l[j].Name }
func (l ServerMetricList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
func init() {
ServerMetricInfoMap = make(map[string]*ServerMetric)
}
func (s *ServerMetric) clone() *ServerMetric {
copy := *s
copy.CustomDomains = make([]string, len(s.CustomDomains))
var i int
for i = range copy.CustomDomains {
copy.CustomDomains[i] = s.CustomDomains[i]
}
copy.Daily = make([]*DailyServerStats, len(s.Daily))
for i = range copy.Daily {
tmpDaily := *s.Daily[i]
copy.Daily[i] = &tmpDaily
}
return &copy
}
func GetAllProxyMetrics() []*ServerMetric {
result := make(ServerMetricList, 0)
smMutex.RLock()
for _, metric := range ServerMetricInfoMap {
metric.mutex.RLock()
tmpMetric := metric.clone()
metric.mutex.RUnlock()
result = append(result, tmpMetric)
}
smMutex.RUnlock()
// sort for result by proxy name
sort.Sort(result)
return result
}
// if proxyName isn't exist, return nil
func GetProxyMetrics(proxyName string) *ServerMetric {
smMutex.RLock()
defer smMutex.RUnlock()
metric, ok := ServerMetricInfoMap[proxyName]
if ok {
metric.mutex.RLock()
tmpMetric := metric.clone()
metric.mutex.RUnlock()
return tmpMetric
} else {
return nil
}
}
func SetProxyInfo(proxyName string, proxyType, bindAddr string,
useEncryption, useGzip, privilegeMode bool, customDomains []string,
locations []string, listenPort int64) {
smMutex.Lock()
info, ok := ServerMetricInfoMap[proxyName]
if !ok {
info = &ServerMetric{}
info.Daily = make([]*DailyServerStats, 0)
}
info.Name = proxyName
info.Type = proxyType
info.UseEncryption = useEncryption
info.UseGzip = useGzip
info.PrivilegeMode = privilegeMode
info.BindAddr = bindAddr
info.ListenPort = listenPort
info.CustomDomains = customDomains
info.Locations = locations
ServerMetricInfoMap[proxyName] = info
smMutex.Unlock()
}
func SetStatus(proxyName string, status int64) {
smMutex.RLock()
metric, ok := ServerMetricInfoMap[proxyName]
smMutex.RUnlock()
if ok {
metric.mutex.Lock()
metric.Status = consts.StatusStr[status]
metric.mutex.Unlock()
}
}
type DealFuncType func(*DailyServerStats)
func DealDailyData(dailyData []*DailyServerStats, fn DealFuncType) (newDailyData []*DailyServerStats) {
now := time.Now().Format("20060102")
dailyLen := len(dailyData)
if dailyLen == 0 {
daily := &DailyServerStats{}
daily.Time = now
fn(daily)
dailyData = append(dailyData, daily)
} else {
daily := dailyData[dailyLen-1]
if daily.Time == now {
fn(daily)
} else {
newDaily := &DailyServerStats{}
newDaily.Time = now
fn(newDaily)
if dailyLen == DailyDataKeepDays {
for i := 0; i < dailyLen-1; i++ {
dailyData[i] = dailyData[i+1]
}
dailyData[dailyLen-1] = newDaily
} else {
dailyData = append(dailyData, newDaily)
}
}
}
return dailyData
}
func OpenConnection(proxyName string) {
smMutex.RLock()
metric, ok := ServerMetricInfoMap[proxyName]
smMutex.RUnlock()
if ok {
metric.mutex.Lock()
metric.CurrentConns++
metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) {
stats.TotalAcceptConns++
})
metric.mutex.Unlock()
}
}
func CloseConnection(proxyName string) {
smMutex.RLock()
metric, ok := ServerMetricInfoMap[proxyName]
smMutex.RUnlock()
if ok {
metric.mutex.Lock()
metric.CurrentConns--
metric.mutex.Unlock()
}
}
func AddFlowIn(proxyName string, value int64) {
smMutex.RLock()
metric, ok := ServerMetricInfoMap[proxyName]
smMutex.RUnlock()
if ok {
metric.mutex.Lock()
metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) {
stats.FlowIn += value
})
metric.mutex.Unlock()
}
}
func AddFlowOut(proxyName string, value int64) {
smMutex.RLock()
metric, ok := ServerMetricInfoMap[proxyName]
smMutex.RUnlock()
if ok {
metric.mutex.Lock()
metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) {
stats.FlowOut += value
})
metric.mutex.Unlock()
}
}
*/

View File

@ -1,3 +1,17 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
@ -253,10 +267,13 @@ func (ctl *Control) stoper() {
for _, pxy := range ctl.proxies {
pxy.Close()
ctl.svr.DelProxy(pxy.GetName())
StatsCloseProxy(pxy.GetConf().GetBaseInfo().ProxyType)
}
ctl.allShutdown.Done()
ctl.conn.Info("client exit success")
StatsCloseClient()
}
func (ctl *Control) manager() {
@ -296,6 +313,7 @@ func (ctl *Control) manager() {
ctl.conn.Warn("new proxy [%s] error: %v", m.ProxyName, err)
} else {
ctl.conn.Info("new proxy [%s] success", m.ProxyName)
StatsNewProxy(m.ProxyName, m.ProxyType)
}
ctl.sendCh <- resp
case *msg.Ping:

View File

@ -1,4 +1,4 @@
// Copyright 2016 fatedier, fatedier@gmail.com
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -22,6 +22,8 @@ import (
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/models/config"
"github.com/julienschmidt/httprouter"
)
var (
@ -31,20 +33,27 @@ var (
func RunDashboardServer(addr string, port int64) (err error) {
// url router
mux := http.NewServeMux()
router := httprouter.New()
// api, see dashboard_api.go
//mux.HandleFunc("/api/reload", use(apiReload, basicAuth))
//mux.HandleFunc("/api/proxies", apiProxies)
router.GET("/api/serverinfo", apiServerInfo)
router.GET("/api/proxy/tcp", apiProxyTcp)
router.GET("/api/proxy/udp", apiProxyUdp)
router.GET("/api/proxy/http", apiProxyHttp)
router.GET("/api/proxy/https", apiProxyHttps)
router.GET("/api/proxy/flow/:name", apiProxyFlow)
// view, see dashboard_view.go
mux.Handle("/favicon.ico", http.FileServer(assets.FileSystem))
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))
//mux.HandleFunc("/", use(viewDashboard, basicAuth))
//router.GET("/favicon.ico", http.FileServer(assets.FileSystem))
router.Handler("GET", "/favicon.ico", http.FileServer(assets.FileSystem))
router.Handler("GET", "/static", http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))
//router.GET("/", use(viewDashboard, basicAuth))
address := fmt.Sprintf("%s:%d", addr, port)
server := &http.Server{
Addr: address,
Handler: mux,
Handler: router,
ReadTimeout: httpServerReadTimeout,
WriteTimeout: httpServerWriteTimeout,
}
@ -64,7 +73,6 @@ func use(h http.HandlerFunc, middleware ...func(http.HandlerFunc) http.HandlerFu
for _, m := range middleware {
h = m(h)
}
return h
}

View File

@ -1,4 +1,4 @@
// Copyright 2016 fatedier, fatedier@gmail.com
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -14,14 +14,15 @@
package server
/*
import (
"encoding/json"
"fmt"
"net/http"
"github.com/fatedier/frp/src/models/metric"
"github.com/fatedier/frp/src/utils/log"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/consts"
"github.com/fatedier/frp/utils/log"
"github.com/julienschmidt/httprouter"
)
type GeneralResponse struct {
@ -29,41 +30,187 @@ type GeneralResponse struct {
Msg string `json:"msg"`
}
func apiReload(w http.ResponseWriter, r *http.Request) {
var buf []byte
res := &GeneralResponse{}
// api/serverinfo
type ServerInfoResp struct {
GeneralResponse
VhostHttpPort int64 `json:"vhost_http_port"`
VhostHttpsPort int64 `json:"vhost_https_port"`
AuthTimeout int64 `json:"auth_timeout"`
SubdomainHost string `json:"subdomain_host"`
MaxPoolCount int64 `json:"max_pool_count"`
HeartBeatTimeout int64 `json:"heart_beat_timeout"`
TotalFlowIn int64 `json:"total_flow_in"`
TotalFlowOut int64 `json:"total_flow_out"`
CurConns int64 `json:"cur_conns"`
ClientCounts int64 `json:"client_counts"`
ProxyTypeCounts map[string]int64 `json:"proxy_type_count"`
}
func apiServerInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var (
buf []byte
res ServerInfoResp
)
defer func() {
log.Info("Http response [/api/reload]: %s", string(buf))
log.Info("Http response [/api/serverinfo]: code [%d]", res.Code)
}()
log.Info("Http request: [/api/reload]")
err := ReloadConf(ConfigFile)
if err != nil {
res.Code = 2
res.Msg = fmt.Sprintf("%v", err)
log.Error("frps reload error: %v", err)
log.Info("Http request: [/api/serverinfo]")
cfg := config.ServerCommonCfg
serverStats := StatsGetServer()
res = ServerInfoResp{
VhostHttpPort: cfg.VhostHttpPort,
VhostHttpsPort: cfg.VhostHttpsPort,
AuthTimeout: cfg.AuthTimeout,
SubdomainHost: cfg.SubDomainHost,
MaxPoolCount: cfg.MaxPoolCount,
HeartBeatTimeout: cfg.HeartBeatTimeout,
TotalFlowIn: serverStats.TotalFlowIn,
TotalFlowOut: serverStats.TotalFlowOut,
CurConns: serverStats.CurConns,
ClientCounts: serverStats.ClientCounts,
ProxyTypeCounts: serverStats.ProxyTypeCounts,
}
buf, _ = json.Marshal(res)
buf, _ = json.Marshal(&res)
w.Write(buf)
}
type ProxiesResponse struct {
Code int64 `json:"code"`
Msg string `json:"msg"`
Proxies []*metric.ServerMetric `json:"proxies"`
// Get proxy info.
type ProxyStatsInfo struct {
Conf config.ProxyConf `json:"conf"`
TodayFlowIn int64 `json:"today_flow_in"`
TodayFlowOut int64 `json:"today_flow_out"`
CurConns int64 `json:"cur_conns"`
Status string `json:"status"`
}
func apiProxies(w http.ResponseWriter, r *http.Request) {
var buf []byte
res := &ProxiesResponse{}
type GetProxyInfoResp struct {
GeneralResponse
Proxies []*ProxyStatsInfo `json:"proxies"`
}
// api/proxy/tcp
func apiProxyTcp(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var (
buf []byte
res GetProxyInfoResp
)
defer func() {
log.Info("Http response [/api/proxies]: code [%d]", res.Code)
log.Info("Http response [/api/proxy/tcp]: code [%d]", res.Code)
}()
log.Info("Http request: [/api/proxy/tcp]")
log.Info("Http request: [/api/proxies]")
res.Proxies = metric.GetAllProxyMetrics()
buf, _ = json.Marshal(res)
res.Proxies = getProxyStatsByType(consts.TcpProxy)
buf, _ = json.Marshal(&res)
w.Write(buf)
}
// api/proxy/udp
func apiProxyUdp(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var (
buf []byte
res GetProxyInfoResp
)
defer func() {
log.Info("Http response [/api/proxy/udp]: code [%d]", res.Code)
}()
log.Info("Http request: [/api/proxy/udp]")
res.Proxies = getProxyStatsByType(consts.UdpProxy)
buf, _ = json.Marshal(&res)
w.Write(buf)
}
// api/proxy/http
func apiProxyHttp(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var (
buf []byte
res GetProxyInfoResp
)
defer func() {
log.Info("Http response [/api/proxy/http]: code [%d]", res.Code)
}()
log.Info("Http request: [/api/proxy/http]")
res.Proxies = getProxyStatsByType(consts.HttpProxy)
buf, _ = json.Marshal(&res)
w.Write(buf)
}
// api/proxy/https
func apiProxyHttps(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var (
buf []byte
res GetProxyInfoResp
)
defer func() {
log.Info("Http response [/api/proxy/https]: code [%d]", res.Code)
}()
log.Info("Http request: [/api/proxy/https]")
res.Proxies = getProxyStatsByType(consts.HttpsProxy)
buf, _ = json.Marshal(&res)
w.Write(buf)
}
func getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) {
proxyStats := StatsGetProxiesByType(proxyType)
proxyInfos = make([]*ProxyStatsInfo, 0, len(proxyStats))
for _, ps := range proxyStats {
proxyInfo := &ProxyStatsInfo{}
if pxy, ok := ServerService.pxyManager.GetByName(ps.Name); ok {
proxyInfo.Conf = pxy.GetConf()
proxyInfo.Status = consts.Online
} else {
proxyInfo.Status = consts.Offline
}
proxyInfo.TodayFlowIn = ps.TodayFlowIn
proxyInfo.TodayFlowOut = ps.TodayFlowOut
proxyInfo.CurConns = ps.CurConns
proxyInfos = append(proxyInfos, proxyInfo)
}
return
}
// api/proxy/:name/flow
type GetProxyFlowResp struct {
GeneralResponse
Name string `json:"name"`
FlowIn []int64 `json:"flow_in"`
FlowOut []int64 `json:"flow_out"`
}
func apiProxyFlow(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
var (
buf []byte
res GetProxyFlowResp
)
name := params.ByName("name")
defer func() {
log.Info("Http response [/api/proxy/flow/:name]: code [%d]", res.Code)
}()
log.Info("Http request: [/api/proxy/flow/:name]")
res.Name = name
proxyFlowInfo := StatsGetProxyFlow(name)
if proxyFlowInfo == nil {
res.Code = 1
res.Msg = "no proxy info found"
} else {
res.FlowIn = proxyFlowInfo.FlowIn
res.FlowOut = proxyFlowInfo.FlowOut
}
buf, _ = json.Marshal(&res)
w.Write(buf)
}
*/

View File

@ -1,3 +1,17 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (

241
server/metric.go Normal file
View File

@ -0,0 +1,241 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"sync"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/utils/metric"
)
const (
ReserveDays = 7
)
var globalStats *ServerStatistics
type ServerStatistics struct {
TotalFlowIn metric.DateCounter
TotalFlowOut metric.DateCounter
CurConns metric.Counter
ClientCounts metric.Counter
ProxyTypeCounts map[string]metric.Counter
ProxyStatistics map[string]*ProxyStatistics
mu sync.Mutex
}
type ProxyStatistics struct {
ProxyType string
FlowIn metric.DateCounter
FlowOut metric.DateCounter
CurConns metric.Counter
}
func init() {
globalStats = &ServerStatistics{
TotalFlowIn: metric.NewDateCounter(ReserveDays),
TotalFlowOut: metric.NewDateCounter(ReserveDays),
CurConns: metric.NewCounter(),
ClientCounts: metric.NewCounter(),
ProxyTypeCounts: make(map[string]metric.Counter),
ProxyStatistics: make(map[string]*ProxyStatistics),
}
}
func StatsNewClient() {
if config.ServerCommonCfg.DashboardPort != 0 {
globalStats.ClientCounts.Inc(1)
}
}
func StatsCloseClient() {
if config.ServerCommonCfg.DashboardPort != 0 {
globalStats.ClientCounts.Dec(1)
}
}
func StatsNewProxy(name string, proxyType string) {
if config.ServerCommonCfg.DashboardPort != 0 {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
counter, ok := globalStats.ProxyTypeCounts[proxyType]
if !ok {
counter = metric.NewCounter()
}
counter.Inc(1)
globalStats.ProxyTypeCounts[proxyType] = counter
proxyStats, ok := globalStats.ProxyStatistics[name]
if !ok {
proxyStats = &ProxyStatistics{
ProxyType: proxyType,
CurConns: metric.NewCounter(),
FlowIn: metric.NewDateCounter(ReserveDays),
FlowOut: metric.NewDateCounter(ReserveDays),
}
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsCloseProxy(proxyType string) {
if config.ServerCommonCfg.DashboardPort != 0 {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
if counter, ok := globalStats.ProxyTypeCounts[proxyType]; ok {
counter.Dec(1)
}
}
}
func StatsOpenConnection(name string) {
if config.ServerCommonCfg.DashboardPort != 0 {
globalStats.CurConns.Inc(1)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.CurConns.Inc(1)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsCloseConnection(name string) {
if config.ServerCommonCfg.DashboardPort != 0 {
globalStats.CurConns.Dec(1)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.CurConns.Dec(1)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsAddFlowIn(name string, flowIn int64) {
if config.ServerCommonCfg.DashboardPort != 0 {
globalStats.TotalFlowIn.Inc(flowIn)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.FlowIn.Inc(flowIn)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsAddFlowOut(name string, flowOut int64) {
if config.ServerCommonCfg.DashboardPort != 0 {
globalStats.TotalFlowOut.Inc(flowOut)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.FlowOut.Inc(flowOut)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
// Functions for getting server stats.
type ServerStats struct {
TotalFlowIn int64
TotalFlowOut int64
CurConns int64
ClientCounts int64
ProxyTypeCounts map[string]int64
}
func StatsGetServer() *ServerStats {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
s := &ServerStats{
TotalFlowIn: globalStats.TotalFlowIn.TodayCount(),
TotalFlowOut: globalStats.TotalFlowOut.TodayCount(),
CurConns: globalStats.CurConns.Count(),
ClientCounts: globalStats.ClientCounts.Count(),
ProxyTypeCounts: make(map[string]int64),
}
for k, v := range globalStats.ProxyTypeCounts {
s.ProxyTypeCounts[k] = v.Count()
}
return s
}
type ProxyStats struct {
Name string
Type string
TodayFlowIn int64
TodayFlowOut int64
CurConns int64
}
func StatsGetProxiesByType(proxyType string) []*ProxyStats {
res := make([]*ProxyStats, 0)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
for name, proxyStats := range globalStats.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
ps := &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayFlowIn: proxyStats.FlowIn.TodayCount(),
TodayFlowOut: proxyStats.FlowOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
res = append(res, ps)
}
return res
}
type ProxyFlowInfo struct {
Name string
FlowIn []int64
FlowOut []int64
}
func StatsGetProxyFlow(name string) (res *ProxyFlowInfo) {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
res = &ProxyFlowInfo{
Name: name,
}
res.FlowIn = proxyStats.FlowIn.GetLastDaysCount(ReserveDays)
res.FlowOut = proxyStats.FlowOut.GetLastDaysCount(ReserveDays)
}
return
}

View File

@ -1,3 +1,17 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
@ -402,6 +416,11 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) {
}
pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
tcp.Join(local, userConn)
StatsOpenConnection(pxy.GetName())
inCount, outCount := tcp.Join(local, userConn)
StatsCloseConnection(pxy.GetName())
StatsAddFlowIn(pxy.GetName(), inCount)
StatsAddFlowOut(pxy.GetName(), outCount)
pxy.Debug("join connections closed")
}

View File

@ -1,3 +1,17 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
@ -14,6 +28,8 @@ import (
"github.com/fatedier/frp/utils/vhost"
)
var ServerService *Service
// Server service.
type Service struct {
// Accept connections from client.
@ -172,6 +188,9 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
ctlConn.AddLogPrefix(loginMsg.RunId)
ctl.Start()
// for statistics
StatsNewClient()
return
}

60
utils/metric/counter.go Normal file
View File

@ -0,0 +1,60 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"sync/atomic"
)
type Counter interface {
Count() int64
Inc(int64)
Dec(int64)
Snapshot() Counter
Clear()
}
func NewCounter() Counter {
return &StandardCounter{
count: 0,
}
}
type StandardCounter struct {
count int64
}
func (c *StandardCounter) Count() int64 {
return atomic.LoadInt64(&c.count)
}
func (c *StandardCounter) Inc(count int64) {
atomic.AddInt64(&c.count, count)
}
func (c *StandardCounter) Dec(count int64) {
atomic.AddInt64(&c.count, -count)
}
func (c *StandardCounter) Snapshot() Counter {
tmp := &StandardCounter{
count: atomic.LoadInt64(&c.count),
}
return tmp
}
func (c *StandardCounter) Clear() {
atomic.StoreInt64(&c.count, 0)
}

View File

@ -0,0 +1,157 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"sync"
"time"
)
type DateCounter interface {
TodayCount() int64
GetLastDaysCount(lastdays int64) []int64
Inc(int64)
Dec(int64)
Snapshot() DateCounter
Clear()
Close()
}
func NewDateCounter(reserveDays int64) DateCounter {
if reserveDays <= 0 {
reserveDays = 1
}
return newStandardDateCounter(reserveDays)
}
type StandardDateCounter struct {
reserveDays int64
counts []int64
closeCh chan struct{}
closed bool
mu sync.Mutex
}
func newStandardDateCounter(reserveDays int64) *StandardDateCounter {
s := &StandardDateCounter{
reserveDays: reserveDays,
counts: make([]int64, reserveDays),
closeCh: make(chan struct{}),
}
s.startRotateWorker()
return s
}
func (c *StandardDateCounter) TodayCount() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.counts[0]
}
func (c *StandardDateCounter) GetLastDaysCount(lastdays int64) []int64 {
if lastdays > c.reserveDays {
lastdays = c.reserveDays
}
counts := make([]int64, lastdays)
c.mu.Lock()
defer c.mu.Unlock()
for i := 0; i < int(lastdays); i++ {
counts[i] = c.counts[i]
}
return counts
}
func (c *StandardDateCounter) Inc(count int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.counts[0] += count
}
func (c *StandardDateCounter) Dec(count int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.counts[0] -= count
}
func (c *StandardDateCounter) Snapshot() DateCounter {
c.mu.Lock()
defer c.mu.Unlock()
tmp := &StandardDateCounter{
reserveDays: c.reserveDays,
counts: make([]int64, c.reserveDays),
}
for i := 0; i < int(c.reserveDays); i++ {
tmp.counts[i] = c.counts[i]
}
return tmp
}
func (c *StandardDateCounter) Clear() {
c.mu.Lock()
defer c.mu.Unlock()
for i := 0; i < int(c.reserveDays); i++ {
c.counts[i] = 0
}
}
func (c *StandardDateCounter) Close() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.closed {
close(c.closeCh)
c.closed = true
}
}
func (c *StandardDateCounter) rotate() {
c.mu.Lock()
defer c.mu.Unlock()
newCounts := make([]int64, c.reserveDays)
for i := 1; i < int(c.reserveDays-1); i++ {
newCounts[i] = c.counts[i+1]
}
c.counts = newCounts
}
func (c *StandardDateCounter) startRotateWorker() {
now := time.Now()
nextDayTimeStr := now.Add(24 * time.Hour).Format("20060102")
nextDay, _ := time.Parse("20060102", nextDayTimeStr)
d := nextDay.Sub(now)
firstTimer := time.NewTimer(d)
rotateTicker := time.NewTicker(24 * time.Hour)
go func() {
for {
select {
case <-firstTimer.C:
firstTimer.Stop()
rotateTicker.Stop()
rotateTicker = time.NewTicker(24 * time.Hour)
c.rotate()
case <-rotateTicker.C:
c.rotate()
case <-c.closeCh:
break
}
}
firstTimer.Stop()
rotateTicker.Stop()
}()
}