From cf66ca10b4f318eacaf59733909902172a2ba4f0 Mon Sep 17 00:00:00 2001 From: fatedier Date: Wed, 19 Oct 2022 12:14:35 +0800 Subject: [PATCH] improve http group load balancing (#3131) --- pkg/util/version/version.go | 2 +- pkg/util/vhost/http.go | 79 +++++++++++++++++++------------------ pkg/util/vhost/vhost.go | 23 ++++++++--- server/group/http.go | 39 +++++++++++++++++- test/e2e/features/group.go | 23 +++++++++++ 5 files changed, 119 insertions(+), 47 deletions(-) diff --git a/pkg/util/version/version.go b/pkg/util/version/version.go index b8b7be8..f2805c5 100644 --- a/pkg/util/version/version.go +++ b/pkg/util/version/version.go @@ -19,7 +19,7 @@ import ( "strings" ) -var version = "0.44.0" +var version = "0.45.0" func Full() string { return version diff --git a/pkg/util/vhost/http.go b/pkg/util/vhost/http.go index 846fdca..5addb47 100644 --- a/pkg/util/vhost/http.go +++ b/pkg/util/vhost/http.go @@ -60,19 +60,28 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) * // Modify incoming requests by route policies. Director: func(req *http.Request) { req.URL.Scheme = "http" - url := req.Context().Value(RouteInfoURL).(string) - routeByHTTPUser := req.Context().Value(RouteInfoHTTPUser).(string) - oldHost, _ := util.CanonicalHost(req.Context().Value(RouteInfoHost).(string)) - rc := rp.GetRouteConfig(oldHost, url, routeByHTTPUser) + reqRouteInfo := req.Context().Value(RouteInfoKey).(*RequestRouteInfo) + oldHost, _ := util.CanonicalHost(reqRouteInfo.Host) + + rc := rp.GetRouteConfig(oldHost, reqRouteInfo.URL, reqRouteInfo.HTTPUser) if rc != nil { if rc.RewriteHost != "" { req.Host = rc.RewriteHost } - // Set {domain}.{location}.{routeByHTTPUser} as URL host here to let http transport reuse connections. - // TODO(fatedier): use proxy name instead? + + var endpoint string + if rc.ChooseEndpointFn != nil { + // ignore error here, it will use CreateConnFn instead later + endpoint, _ = rc.ChooseEndpointFn() + reqRouteInfo.Endpoint = endpoint + frpLog.Trace("choose endpoint name [%s] for http request host [%s] path [%s] httpuser [%s]", + endpoint, oldHost, reqRouteInfo.URL, reqRouteInfo.HTTPUser) + } + // Set {domain}.{location}.{routeByHTTPUser}.{endpoint} as URL host here to let http transport reuse connections. req.URL.Host = rc.Domain + "." + base64.StdEncoding.EncodeToString([]byte(rc.Location)) + "." + - base64.StdEncoding.EncodeToString([]byte(rc.RouteByHTTPUser)) + base64.StdEncoding.EncodeToString([]byte(rc.RouteByHTTPUser)) + "." + + base64.StdEncoding.EncodeToString([]byte(endpoint)) for k, v := range rc.Headers { req.Header.Set(k, v) @@ -85,12 +94,9 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) * Transport: &http.Transport{ ResponseHeaderTimeout: rp.responseHeaderTimeout, IdleConnTimeout: 60 * time.Second, + MaxIdleConnsPerHost: 5, DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - url := ctx.Value(RouteInfoURL).(string) - host, _ := util.CanonicalHost(ctx.Value(RouteInfoHost).(string)) - routerByHTTPUser := ctx.Value(RouteInfoHTTPUser).(string) - remote := ctx.Value(RouteInfoRemote).(string) - return rp.CreateConnection(host, url, routerByHTTPUser, remote) + return rp.CreateConnection(ctx.Value(RouteInfoKey).(*RequestRouteInfo), true) }, Proxy: func(req *http.Request) (*url.URL, error) { // Use proxy mode if there is host in HTTP first request line. @@ -100,7 +106,7 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) * // Normal: // GET / HTTP/1.1 // Host: example.com - urlHost := req.Context().Value(RouteInfoURLHost).(string) + urlHost := req.Context().Value(RouteInfoKey).(*RequestRouteInfo).URLHost if urlHost != "" { return req.URL, nil } @@ -143,14 +149,6 @@ func (rp *HTTPReverseProxy) GetRouteConfig(domain, location, routeByHTTPUser str return nil } -func (rp *HTTPReverseProxy) GetRealHost(domain, location, routeByHTTPUser string) (host string) { - vr, ok := rp.getVhost(domain, location, routeByHTTPUser) - if ok { - host = vr.payload.(*RouteConfig).RewriteHost - } - return -} - func (rp *HTTPReverseProxy) GetHeaders(domain, location, routeByHTTPUser string) (headers map[string]string) { vr, ok := rp.getVhost(domain, location, routeByHTTPUser) if ok { @@ -160,15 +158,22 @@ func (rp *HTTPReverseProxy) GetHeaders(domain, location, routeByHTTPUser string) } // CreateConnection create a new connection by route config -func (rp *HTTPReverseProxy) CreateConnection(domain, location, routeByHTTPUser string, remoteAddr string) (net.Conn, error) { - vr, ok := rp.getVhost(domain, location, routeByHTTPUser) +func (rp *HTTPReverseProxy) CreateConnection(reqRouteInfo *RequestRouteInfo, byEndpoint bool) (net.Conn, error) { + host, _ := util.CanonicalHost(reqRouteInfo.Host) + vr, ok := rp.getVhost(host, reqRouteInfo.URL, reqRouteInfo.HTTPUser) if ok { + if byEndpoint { + fn := vr.payload.(*RouteConfig).CreateConnByEndpointFn + if fn != nil { + return fn(reqRouteInfo.Endpoint, reqRouteInfo.RemoteAddr) + } + } fn := vr.payload.(*RouteConfig).CreateConnFn if fn != nil { - return fn(remoteAddr) + return fn(reqRouteInfo.RemoteAddr) } } - return nil, fmt.Errorf("%v: %s %s %s", ErrNoRouteFound, domain, location, routeByHTTPUser) + return nil, fmt.Errorf("%v: %s %s %s", ErrNoRouteFound, host, reqRouteInfo.URL, reqRouteInfo.HTTPUser) } func (rp *HTTPReverseProxy) CheckAuth(domain, location, routeByHTTPUser, user, passwd string) bool { @@ -244,12 +249,7 @@ func (rp *HTTPReverseProxy) connectHandler(rw http.ResponseWriter, req *http.Req return } - url := req.Context().Value(RouteInfoURL).(string) - routeByHTTPUser := req.Context().Value(RouteInfoHTTPUser).(string) - domain, _ := util.CanonicalHost(req.Context().Value(RouteInfoHost).(string)) - remoteAddr := req.Context().Value(RouteInfoRemote).(string) - - remote, err := rp.CreateConnection(domain, url, routeByHTTPUser, remoteAddr) + remote, err := rp.CreateConnection(req.Context().Value(RouteInfoKey).(*RequestRouteInfo), false) if err != nil { _ = notFoundResponse().Write(client) client.Close() @@ -278,11 +278,6 @@ func parseBasicAuth(auth string) (username, password string, ok bool) { } func (rp *HTTPReverseProxy) injectRequestInfoToCtx(req *http.Request) *http.Request { - newctx := req.Context() - newctx = context.WithValue(newctx, RouteInfoURL, req.URL.Path) - newctx = context.WithValue(newctx, RouteInfoHost, req.Host) - newctx = context.WithValue(newctx, RouteInfoURLHost, req.URL.Host) - user := "" // If url host isn't empty, it's a proxy request. Get http user from Proxy-Authorization header. if req.URL.Host != "" { @@ -294,8 +289,16 @@ func (rp *HTTPReverseProxy) injectRequestInfoToCtx(req *http.Request) *http.Requ if user == "" { user, _, _ = req.BasicAuth() } - newctx = context.WithValue(newctx, RouteInfoHTTPUser, user) - newctx = context.WithValue(newctx, RouteInfoRemote, req.RemoteAddr) + + reqRouteInfo := &RequestRouteInfo{ + URL: req.URL.Path, + Host: req.Host, + HTTPUser: user, + RemoteAddr: req.RemoteAddr, + URLHost: req.URL.Host, + } + newctx := req.Context() + newctx = context.WithValue(newctx, RouteInfoKey, reqRouteInfo) return req.Clone(newctx) } diff --git a/pkg/util/vhost/vhost.go b/pkg/util/vhost/vhost.go index 5069005..651d960 100644 --- a/pkg/util/vhost/vhost.go +++ b/pkg/util/vhost/vhost.go @@ -29,13 +29,18 @@ import ( type RouteInfo string const ( - RouteInfoURL RouteInfo = "url" - RouteInfoHost RouteInfo = "host" - RouteInfoHTTPUser RouteInfo = "httpUser" - RouteInfoRemote RouteInfo = "remote" - RouteInfoURLHost RouteInfo = "urlHost" + RouteInfoKey RouteInfo = "routeInfo" ) +type RequestRouteInfo struct { + URL string + Host string + HTTPUser string + RemoteAddr string + URLHost string + Endpoint string +} + type ( muxFunc func(net.Conn) (net.Conn, map[string]string, error) httpAuthFunc func(net.Conn, string, string, string) (bool, error) @@ -75,8 +80,12 @@ func NewMuxer( return mux, nil } +type ChooseEndpointFunc func() (string, error) + type CreateConnFunc func(remoteAddr string) (net.Conn, error) +type CreateConnByEndpointFunc func(endpoint, remoteAddr string) (net.Conn, error) + // RouteConfig is the params used to match HTTP requests type RouteConfig struct { Domain string @@ -87,7 +96,9 @@ type RouteConfig struct { Headers map[string]string RouteByHTTPUser string - CreateConnFn CreateConnFunc + CreateConnFn CreateConnFunc + ChooseEndpointFn ChooseEndpointFunc + CreateConnByEndpointFn CreateConnByEndpointFunc } // listen for a new domain name, if rewriteHost is not empty and rewriteFunc is not nil diff --git a/server/group/http.go b/server/group/http.go index 7e081eb..6409192 100644 --- a/server/group/http.go +++ b/server/group/http.go @@ -10,7 +10,7 @@ import ( ) type HTTPGroupController struct { - // groups by indexKey + // groups indexed by group name groups map[string]*HTTPGroup // register createConn for each group to vhostRouter. @@ -65,7 +65,7 @@ type HTTPGroup struct { location string routeByHTTPUser string - // CreateConnFuncs indexed by echo proxy name + // CreateConnFuncs indexed by proxy name createFuncs map[string]vhost.CreateConnFunc pxyNames []string index uint64 @@ -91,6 +91,8 @@ func (g *HTTPGroup) Register( // the first proxy in this group tmp := routeConfig // copy object tmp.CreateConnFn = g.createConn + tmp.ChooseEndpointFn = g.chooseEndpoint + tmp.CreateConnByEndpointFn = g.createConnByEndpoint err = g.ctl.vhostRouter.Add(routeConfig.Domain, routeConfig.Location, routeConfig.RouteByHTTPUser, &tmp) if err != nil { return @@ -161,3 +163,36 @@ func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) { return f(remoteAddr) } + +func (g *HTTPGroup) chooseEndpoint() (string, error) { + newIndex := atomic.AddUint64(&g.index, 1) + name := "" + + g.mu.RLock() + group := g.group + domain := g.domain + location := g.location + routeByHTTPUser := g.routeByHTTPUser + if len(g.pxyNames) > 0 { + name = g.pxyNames[int(newIndex)%len(g.pxyNames)] + } + g.mu.RUnlock() + + if name == "" { + return "", fmt.Errorf("no healthy endpoint for http group [%s], domain [%s], location [%s], routeByHTTPUser [%s]", + group, domain, location, routeByHTTPUser) + } + return name, nil +} + +func (g *HTTPGroup) createConnByEndpoint(endpoint, remoteAddr string) (net.Conn, error) { + var f vhost.CreateConnFunc + g.mu.RLock() + f = g.createFuncs[endpoint] + g.mu.RUnlock() + + if f == nil { + return nil, fmt.Errorf("no CreateConnFunc for endpoint [%s] in group [%s]", endpoint, g.group) + } + return f(remoteAddr) +} diff --git a/test/e2e/features/group.go b/test/e2e/features/group.go index f66fae0..f36e75b 100644 --- a/test/e2e/features/group.go +++ b/test/e2e/features/group.go @@ -217,6 +217,29 @@ var _ = ginkgo.Describe("[Feature: Group]", func() { f.RunProcesses([]string{serverConf}, []string{clientConf}) + // send first HTTP request + var contents []string + framework.NewRequestExpect(f).Port(vhostPort). + RequestModify(func(r *request.Request) { + r.HTTP().HTTPHost("example.com") + }). + Ensure(func(resp *request.Response) bool { + contents = append(contents, string(resp.Content)) + return true + }) + + // send second HTTP request, should be forwarded to another service + framework.NewRequestExpect(f).Port(vhostPort). + RequestModify(func(r *request.Request) { + r.HTTP().HTTPHost("example.com") + }). + Ensure(func(resp *request.Response) bool { + contents = append(contents, string(resp.Content)) + return true + }) + + framework.ExpectContainElements(contents, []string{"foo", "bar"}) + // check foo and bar is ok results := doFooBarHTTPRequest(vhostPort, "example.com") framework.ExpectContainElements(results, []string{"foo", "bar"})