diff --git a/README.md b/README.md index ab9d835..2890d64 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,8 @@ frp also has a P2P connect mode. * [Get proxy status from client](#get-proxy-status-from-client) * [Only allowing certain ports on the server](#only-allowing-certain-ports-on-the-server) * [Port Reuse](#port-reuse) + * [Bandwidth Limit](#bandwidth-limit) + * [For Each Proxy](#for-each-proxy) * [TCP Stream Multiplexing](#tcp-stream-multiplexing) * [Support KCP Protocol](#support-kcp-protocol) * [Connection Pooling](#connection-pooling) @@ -495,6 +497,21 @@ allow_ports = 2000-3000,3001,3003,4000-50000 We would like to try to allow multiple proxies bind a same remote port with different protocols in the future. +### Bandwidth Limit + +#### For Each Proxy + +```ini +# frpc.ini +[ssh] +type = tcp +local_port = 22 +remote_port = 6000 +bandwidth_limit = 1MB +``` + +Set `bandwidth_limit` in each proxy's configure to enable this feature. Supported units are `MB` and `KB`. + ### TCP Stream Multiplexing frp supports tcp stream multiplexing since v0.10.0 like HTTP2 Multiplexing, in which case all logic connections to the same frpc are multiplexed into the same TCP connection. diff --git a/README_zh.md b/README_zh.md index a9e03cb..6f62e38 100644 --- a/README_zh.md +++ b/README_zh.md @@ -33,6 +33,8 @@ frp 是一个可用于内网穿透的高性能的反向代理应用,支持 tcp * [客户端查看代理状态](#客户端查看代理状态) * [端口白名单](#端口白名单) * [端口复用](#端口复用) + * [限速](#限速) + * [代理限速](#代理限速) * [TCP 多路复用](#tcp-多路复用) * [底层通信可选 kcp 协议](#底层通信可选-kcp-协议) * [连接池](#连接池) @@ -531,6 +533,23 @@ allow_ports = 2000-3000,3001,3003,4000-50000 后续会尝试允许多个 proxy 绑定同一个远端端口的不同协议。 +### 限速 + +#### 代理限速 + +目前支持在客户端的代理配置中设置代理级别的限速,限制单个 proxy 可以占用的带宽。 + +```ini +# frpc.ini +[ssh] +type = tcp +local_port = 22 +remote_port = 6000 +bandwith_limit = 1MB +``` + +在代理配置中增加 `bandwith_limit` 字段启用此功能,目前仅支持 `MB` 和 `KB` 单位。 + ### TCP 多路复用 从 v0.10.0 版本开始,客户端和服务器端之间的连接支持多路复用,不再需要为每一个用户请求创建一个连接,使连接建立的延迟降低,并且避免了大量文件描述符的占用,使 frp 可以承载更高的并发数。 diff --git a/assets/assets.go b/assets/assets.go index 504c0e8..2a9e977 100644 --- a/assets/assets.go +++ b/assets/assets.go @@ -55,6 +55,7 @@ func ReadFile(file string) (content string, err error) { if err != nil { return content, err } + defer file.Close() buf, err := ioutil.ReadAll(file) if err != nil { return content, err @@ -65,6 +66,7 @@ func ReadFile(file string) (content string, err error) { if err != nil { return content, err } + defer file.Close() buf, err := ioutil.ReadAll(file) if err != nil { return content, err diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index 2da68ce..268b317 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -30,6 +30,7 @@ import ( "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/plugin" "github.com/fatedier/frp/models/proto/udp" + "github.com/fatedier/frp/utils/limit" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/xlog" @@ -38,6 +39,7 @@ import ( "github.com/fatedier/golib/pool" fmux "github.com/hashicorp/yamux" pp "github.com/pires/go-proxyproto" + "golang.org/x/time/rate" ) // Proxy defines how to handle work connections for different proxy type. @@ -51,9 +53,16 @@ type Proxy interface { } func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) { + var limiter *rate.Limiter + limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes() + if limitBytes > 0 { + limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes)) + } + baseProxy := BaseProxy{ clientCfg: clientCfg, serverUDPPort: serverUDPPort, + limiter: limiter, xl: xlog.FromContextSafe(ctx), ctx: ctx, } @@ -96,6 +105,7 @@ type BaseProxy struct { closed bool clientCfg config.ClientCommonConf serverUDPPort int + limiter *rate.Limiter mu sync.RWMutex xl *xlog.Logger @@ -127,8 +137,8 @@ func (pxy *TcpProxy) Close() { } func (pxy *TcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(pxy.clientCfg.Token), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, + conn, []byte(pxy.clientCfg.Token), m) } // HTTP @@ -156,8 +166,8 @@ func (pxy *HttpProxy) Close() { } func (pxy *HttpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(pxy.clientCfg.Token), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, + conn, []byte(pxy.clientCfg.Token), m) } // HTTPS @@ -185,8 +195,8 @@ func (pxy *HttpsProxy) Close() { } func (pxy *HttpsProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(pxy.clientCfg.Token), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, + conn, []byte(pxy.clientCfg.Token), m) } // STCP @@ -214,8 +224,8 @@ func (pxy *StcpProxy) Close() { } func (pxy *StcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(pxy.clientCfg.Token), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, + conn, []byte(pxy.clientCfg.Token), m) } // XTCP @@ -360,7 +370,7 @@ func (pxy *XtcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { return } - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, muxConn, []byte(pxy.cfg.Sk), m) } @@ -429,6 +439,13 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { // close resources releated with old workConn pxy.Close() + if pxy.limiter != nil { + rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { + return conn.Close() + }) + conn = frpNet.WrapReadWriteCloserToConn(rwc, conn) + } + pxy.mu.Lock() pxy.workConn = conn pxy.readCh = make(chan *msg.UdpPacket, 1024) @@ -491,13 +508,18 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { // Common handler for tcp work connections. func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, - baseInfo *config.BaseProxyConf, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) { + baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) { xl := xlog.FromContextSafe(ctx) var ( remote io.ReadWriteCloser err error ) remote = workConn + if limiter != nil { + remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error { + return workConn.Close() + }) + } xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t", baseInfo.UseEncryption, baseInfo.UseCompression) diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 158ff23..14ca6ed 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -71,6 +71,8 @@ tls_enable = true type = tcp local_ip = 127.0.0.1 local_port = 22 +# limit bandwith for this proxy, unit is KB and MB +bandwith_limit = 1MB # true or false, if true, messages between frps and frpc will be encrypted, default is false use_encryption = false # if true, message will be compressed @@ -205,6 +207,14 @@ plugin_key_path = ./server.key plugin_host_header_rewrite = 127.0.0.1 plugin_header_X-From-Where = frp +[plugin_http2https] +type = http +custom_domains = test.yourdomain.com +plugin = http2https +plugin_local_addr = 127.0.0.1:443 +plugin_host_header_rewrite = 127.0.0.1 +plugin_header_X-From-Where = frp + [secret_tcp] # If the type is secret tcp, remote_port is useless # Who want to connect local port should deploy another frpc with stcp proxy and role is visitor diff --git a/go.mod b/go.mod index 81de8ab..a71a97c 100644 --- a/go.mod +++ b/go.mod @@ -29,4 +29,5 @@ require ( github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 golang.org/x/text v0.3.2 // indirect + golang.org/x/time v0.0.0-20191024005414-555d28b269f0 ) diff --git a/go.sum b/go.sum index b9bdf2a..26c9004 100644 --- a/go.sum +++ b/go.sum @@ -46,4 +46,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/models/config/proxy.go b/models/config/proxy.go index 9ab1ef8..85c353e 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -125,6 +125,11 @@ type BaseProxyConf struct { // values include "v1", "v2", and "". If the value is "", a protocol // version will be automatically selected. By default, this value is "". ProxyProtocolVersion string `json:"proxy_protocol_version"` + + // BandwidthLimit limit the proxy bandwidth + // 0 means no limit + BandwidthLimit BandwidthQuantity `json:"bandwidth_limit"` + LocalSvrConf HealthCheckConf } @@ -140,7 +145,8 @@ func (cfg *BaseProxyConf) compare(cmp *BaseProxyConf) bool { cfg.UseCompression != cmp.UseCompression || cfg.Group != cmp.Group || cfg.GroupKey != cmp.GroupKey || - cfg.ProxyProtocolVersion != cmp.ProxyProtocolVersion { + cfg.ProxyProtocolVersion != cmp.ProxyProtocolVersion || + cfg.BandwidthLimit.Equal(&cmp.BandwidthLimit) { return false } if !cfg.LocalSvrConf.compare(&cmp.LocalSvrConf) { @@ -165,6 +171,7 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i var ( tmpStr string ok bool + err error ) cfg.ProxyName = prefix + name cfg.ProxyType = section["type"] @@ -183,11 +190,15 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i cfg.GroupKey = section["group_key"] cfg.ProxyProtocolVersion = section["proxy_protocol_version"] - if err := cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil { + if cfg.BandwidthLimit, err = NewBandwidthQuantity(section["bandwidth_limit"]); err != nil { return err } - if err := cfg.HealthCheckConf.UnmarshalFromIni(prefix, name, section); err != nil { + if err = cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil { + return err + } + + if err = cfg.HealthCheckConf.UnmarshalFromIni(prefix, name, section); err != nil { return err } diff --git a/models/config/types.go b/models/config/types.go new file mode 100644 index 0000000..87c240d --- /dev/null +++ b/models/config/types.go @@ -0,0 +1,112 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "encoding/json" + "errors" + "strconv" + "strings" +) + +const ( + MB = 1024 * 1024 + KB = 1024 +) + +type BandwidthQuantity struct { + s string // MB or KB + + i int64 // bytes +} + +func NewBandwidthQuantity(s string) (BandwidthQuantity, error) { + q := BandwidthQuantity{} + err := q.UnmarshalString(s) + if err != nil { + return q, err + } + return q, nil +} + +func (q *BandwidthQuantity) Equal(u *BandwidthQuantity) bool { + if q == nil && u == nil { + return true + } + if q != nil && u != nil { + return q.i == u.i + } + return false +} + +func (q *BandwidthQuantity) String() string { + return q.s +} + +func (q *BandwidthQuantity) UnmarshalString(s string) error { + s = strings.TrimSpace(s) + if s == "" { + return nil + } + + var ( + base int64 + f float64 + err error + ) + if strings.HasSuffix(s, "MB") { + base = MB + fstr := strings.TrimSuffix(s, "MB") + f, err = strconv.ParseFloat(fstr, 64) + if err != nil { + return err + } + } else if strings.HasSuffix(s, "KB") { + base = KB + fstr := strings.TrimSuffix(s, "KB") + f, err = strconv.ParseFloat(fstr, 64) + if err != nil { + return err + } + } else { + return errors.New("unit not support") + } + + q.s = s + q.i = int64(f * float64(base)) + return nil +} + +func (q *BandwidthQuantity) UnmarshalJSON(b []byte) error { + if len(b) == 4 && string(b) == "null" { + return nil + } + + var str string + err := json.Unmarshal(b, &str) + if err != nil { + return err + } + + return q.UnmarshalString(str) +} + +func (q *BandwidthQuantity) MarshalJSON() ([]byte, error) { + return []byte("\"" + q.s + "\""), nil +} + +func (q *BandwidthQuantity) Bytes() int64 { + return q.i +} diff --git a/models/config/types_test.go b/models/config/types_test.go new file mode 100644 index 0000000..ab03dfd --- /dev/null +++ b/models/config/types_test.go @@ -0,0 +1,40 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" +) + +type Wrap struct { + B BandwidthQuantity `json:"b"` + Int int `json:"int"` +} + +func TestBandwidthQuantity(t *testing.T) { + assert := assert.New(t) + + var w Wrap + err := json.Unmarshal([]byte(`{"b":"1KB","int":5}`), &w) + assert.NoError(err) + assert.EqualValues(1*KB, w.B.Bytes()) + + buf, err := json.Marshal(&w) + assert.NoError(err) + assert.Equal(`{"b":"1KB","int":5}`, string(buf)) +} diff --git a/models/plugin/http2https.go b/models/plugin/http2https.go new file mode 100644 index 0000000..6f5965e --- /dev/null +++ b/models/plugin/http2https.go @@ -0,0 +1,112 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package plugin + +import ( + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "net/http/httputil" + "strings" + + frpNet "github.com/fatedier/frp/utils/net" +) + +const PluginHTTP2HTTPS = "http2https" + +func init() { + Register(PluginHTTP2HTTPS, NewHTTP2HTTPSPlugin) +} + +type HTTP2HTTPSPlugin struct { + hostHeaderRewrite string + localAddr string + headers map[string]string + + l *Listener + s *http.Server +} + +func NewHTTP2HTTPSPlugin(params map[string]string) (Plugin, error) { + localAddr := params["plugin_local_addr"] + hostHeaderRewrite := params["plugin_host_header_rewrite"] + headers := make(map[string]string) + for k, v := range params { + if !strings.HasPrefix(k, "plugin_header_") { + continue + } + if k = strings.TrimPrefix(k, "plugin_header_"); k != "" { + headers[k] = v + } + } + + if localAddr == "" { + return nil, fmt.Errorf("plugin_local_addr is required") + } + + listener := NewProxyListener() + + p := &HTTPS2HTTPPlugin{ + localAddr: localAddr, + hostHeaderRewrite: hostHeaderRewrite, + headers: headers, + l: listener, + } + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + + rp := &httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL.Scheme = "https" + req.URL.Host = p.localAddr + if p.hostHeaderRewrite != "" { + req.Host = p.hostHeaderRewrite + } + for k, v := range p.headers { + req.Header.Set(k, v) + } + }, + Transport: tr, + } + + p.s = &http.Server{ + Handler: rp, + } + + go p.s.Serve(listener) + + return p, nil +} + + +func (p *HTTP2HTTPSPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) { + wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn) + p.l.PutConn(wrapConn) +} + +func (p *HTTP2HTTPSPlugin) Name() string { + return PluginHTTP2HTTPS +} + +func (p *HTTP2HTTPSPlugin) Close() error { + if err := p.s.Close();err != nil { + return err + } + return nil +} diff --git a/models/plugin/https2http.go b/models/plugin/https2http.go index 6554035..af5b6af 100644 --- a/models/plugin/https2http.go +++ b/models/plugin/https2http.go @@ -126,5 +126,8 @@ func (p *HTTPS2HTTPPlugin) Name() string { } func (p *HTTPS2HTTPPlugin) Close() error { + if err := p.s.Close();err != nil { + return err + } return nil } diff --git a/utils/limit/reader.go b/utils/limit/reader.go new file mode 100644 index 0000000..efa828f --- /dev/null +++ b/utils/limit/reader.go @@ -0,0 +1,51 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package limit + +import ( + "context" + "io" + + "golang.org/x/time/rate" +) + +type Reader struct { + r io.Reader + limiter *rate.Limiter +} + +func NewReader(r io.Reader, limiter *rate.Limiter) *Reader { + return &Reader{ + r: r, + limiter: limiter, + } +} + +func (r *Reader) Read(p []byte) (n int, err error) { + b := r.limiter.Burst() + if b < len(p) { + p = p[:b] + } + n, err = r.r.Read(p) + if err != nil { + return + } + + err = r.limiter.WaitN(context.Background(), n) + if err != nil { + return + } + return +} diff --git a/utils/limit/writer.go b/utils/limit/writer.go new file mode 100644 index 0000000..5256d1e --- /dev/null +++ b/utils/limit/writer.go @@ -0,0 +1,60 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package limit + +import ( + "context" + "io" + + "golang.org/x/time/rate" +) + +type Writer struct { + w io.Writer + limiter *rate.Limiter +} + +func NewWriter(w io.Writer, limiter *rate.Limiter) *Writer { + return &Writer{ + w: w, + limiter: limiter, + } +} + +func (w *Writer) Write(p []byte) (n int, err error) { + var nn int + b := w.limiter.Burst() + for { + end := len(p) + if end == 0 { + break + } + if b < len(p) { + end = b + } + err = w.limiter.WaitN(context.Background(), end) + if err != nil { + return + } + + nn, err = w.w.Write(p[:end]) + n += nn + if err != nil { + return + } + p = p[end:] + } + return +} diff --git a/utils/version/version.go b/utils/version/version.go index cc47e55..dde6de9 100644 --- a/utils/version/version.go +++ b/utils/version/version.go @@ -19,7 +19,7 @@ import ( "strings" ) -var version string = "0.29.1" +var version string = "0.30.0" func Full() string { return version diff --git a/vendor/golang.org/x/time/AUTHORS b/vendor/golang.org/x/time/AUTHORS new file mode 100644 index 0000000..15167cd --- /dev/null +++ b/vendor/golang.org/x/time/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/vendor/golang.org/x/time/CONTRIBUTORS b/vendor/golang.org/x/time/CONTRIBUTORS new file mode 100644 index 0000000..1c4577e --- /dev/null +++ b/vendor/golang.org/x/time/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 0000000..6a66aea --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 0000000..7330990 --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 0000000..563f704 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,400 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "context" + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + limit Limit + burst int + + mu sync.Mutex + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// ReserveN returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + lim.mu.Lock() + burst := lim.burst + limit := lim.limit + lim.mu.Unlock() + + if n > burst && limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait if necessary + delay := r.DelayFrom(now) + if delay == 0 { + return nil + } + t := time.NewTimer(delay) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst). +func (lim *Limiter) SetBurst(newBurst int) { + lim.SetBurstAt(time.Now(), newBurst) +} + +// SetBurstAt sets a new burst size for the limiter. +func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.burst = newBurst +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + // Split the integer and fractional parts ourself to minimize rounding errors. + // See golang.org/issues/34861. + sec := float64(d/time.Second) * float64(limit) + nsec := float64(d%time.Second) * float64(limit) + return sec + nsec/1e9 +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 79bbd1c..079212d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -83,3 +83,5 @@ golang.org/x/text/secure/bidirule golang.org/x/text/transform golang.org/x/text/unicode/bidi golang.org/x/text/unicode/norm +# golang.org/x/time v0.0.0-20191024005414-555d28b269f0 +golang.org/x/time/rate