diff --git a/doc/server_plugin.md b/doc/server_plugin.md index 3697053..d73d243 100644 --- a/doc/server_plugin.md +++ b/doc/server_plugin.md @@ -70,7 +70,7 @@ The response can look like any of the following: ### Operation -Currently `Login`, `NewProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported. +Currently `Login`, `NewProxy`, `CloseProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported. #### Login @@ -136,6 +136,26 @@ Create new proxy } ``` +#### CloseProxy + +A previously created proxy is closed. + +Please note that one request will be sent for every proxy that is closed, do **NOT** use this +if you have too many proxies bound to a single client, as this may exhaust the server's resources. + +``` +{ + "content": { + "user": { + "user": , + "metas": mapstring + "run_id": + }, + "proxy_name": + } +} +``` + #### Ping Heartbeat from frpc diff --git a/pkg/plugin/server/manager.go b/pkg/plugin/server/manager.go index bc88288..47d11d1 100644 --- a/pkg/plugin/server/manager.go +++ b/pkg/plugin/server/manager.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strings" "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/xlog" @@ -26,6 +27,7 @@ import ( type Manager struct { loginPlugins []Plugin newProxyPlugins []Plugin + closeProxyPlugins []Plugin pingPlugins []Plugin newWorkConnPlugins []Plugin newUserConnPlugins []Plugin @@ -35,6 +37,7 @@ func NewManager() *Manager { return &Manager{ loginPlugins: make([]Plugin, 0), newProxyPlugins: make([]Plugin, 0), + closeProxyPlugins: make([]Plugin, 0), pingPlugins: make([]Plugin, 0), newWorkConnPlugins: make([]Plugin, 0), newUserConnPlugins: make([]Plugin, 0), @@ -48,6 +51,9 @@ func (m *Manager) Register(p Plugin) { if p.IsSupport(OpNewProxy) { m.newProxyPlugins = append(m.newProxyPlugins, p) } + if p.IsSupport(OpCloseProxy) { + m.closeProxyPlugins = append(m.closeProxyPlugins, p) + } if p.IsSupport(OpPing) { m.pingPlugins = append(m.pingPlugins, p) } @@ -127,6 +133,32 @@ func (m *Manager) NewProxy(content *NewProxyContent) (*NewProxyContent, error) { return content, nil } +func (m *Manager) CloseProxy(content *CloseProxyContent) error { + if len(m.closeProxyPlugins) == 0 { + return nil + } + + errs := make([]string, 0) + reqid, _ := util.RandID() + xl := xlog.New().AppendPrefix("reqid: " + reqid) + ctx := xlog.NewContext(context.Background(), xl) + ctx = NewReqidContext(ctx, reqid) + + for _, p := range m.closeProxyPlugins { + _, _, err := p.Handle(ctx, OpCloseProxy, *content) + if err != nil { + xl.Warn("send CloseProxy request to plugin [%s] error: %v", p.Name(), err) + errs = append(errs, fmt.Sprintf("[%s]: %v", p.Name(), err)) + } + } + + if len(errs) > 0 { + return fmt.Errorf("send CloseProxy request to plugin errors: %s", strings.Join(errs, "; ")) + } else { + return nil + } +} + func (m *Manager) Ping(content *PingContent) (*PingContent, error) { if len(m.pingPlugins) == 0 { return content, nil diff --git a/pkg/plugin/server/plugin.go b/pkg/plugin/server/plugin.go index 160d12a..0d34de5 100644 --- a/pkg/plugin/server/plugin.go +++ b/pkg/plugin/server/plugin.go @@ -23,6 +23,7 @@ const ( OpLogin = "Login" OpNewProxy = "NewProxy" + OpCloseProxy = "CloseProxy" OpPing = "Ping" OpNewWorkConn = "NewWorkConn" OpNewUserConn = "NewUserConn" diff --git a/pkg/plugin/server/types.go b/pkg/plugin/server/types.go index 4df79f4..d7d98cb 100644 --- a/pkg/plugin/server/types.go +++ b/pkg/plugin/server/types.go @@ -48,6 +48,11 @@ type NewProxyContent struct { msg.NewProxy } +type CloseProxyContent struct { + User UserInfo `json:"user"` + msg.CloseProxy +} + type PingContent struct { User UserInfo `json:"user"` msg.Ping diff --git a/server/control.go b/server/control.go index 25adc2d..0974061 100644 --- a/server/control.go +++ b/server/control.go @@ -376,6 +376,20 @@ func (ctl *Control) stoper() { pxy.Close() ctl.pxyManager.Del(pxy.GetName()) metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) + + notifyContent := &plugin.CloseProxyContent{ + User: plugin.UserInfo{ + User: ctl.loginMsg.User, + Metas: ctl.loginMsg.Metas, + RunID: ctl.loginMsg.RunID, + }, + CloseProxy: msg.CloseProxy{ + ProxyName: pxy.GetName(), + }, + } + go func() { + ctl.pluginManager.CloseProxy(notifyContent) + }() } ctl.allShutdown.Done() @@ -564,5 +578,20 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) { ctl.mu.Unlock() metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) + + notifyContent := &plugin.CloseProxyContent{ + User: plugin.UserInfo{ + User: ctl.loginMsg.User, + Metas: ctl.loginMsg.Metas, + RunID: ctl.loginMsg.RunID, + }, + CloseProxy: msg.CloseProxy{ + ProxyName: pxy.GetName(), + }, + } + go func() { + ctl.pluginManager.CloseProxy(notifyContent) + }() + return } diff --git a/test/e2e/framework/process.go b/test/e2e/framework/process.go index a1b1571..197cb7d 100644 --- a/test/e2e/framework/process.go +++ b/test/e2e/framework/process.go @@ -12,7 +12,7 @@ import ( // RunProcesses run multiple processes from templates. // The first template should always be frps. -func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []string) { +func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []string) ([]*process.Process, []*process.Process) { templates := make([]string, 0, len(serverTemplates)+len(clientTemplates)) for _, t := range serverTemplates { templates = append(templates, t) @@ -28,6 +28,7 @@ func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []str f.usedPorts[name] = port } + currentServerProcesses := make([]*process.Process, 0, len(serverTemplates)) for i := range serverTemplates { path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-server-%d", i)) err = os.WriteFile(path, []byte(outs[i]), 0666) @@ -37,11 +38,13 @@ func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []str p := process.NewWithEnvs(TestContext.FRPServerPath, []string{"-c", path}, f.osEnvs) f.serverConfPaths = append(f.serverConfPaths, path) f.serverProcesses = append(f.serverProcesses, p) + currentServerProcesses = append(currentServerProcesses, p) err = p.Start() ExpectNoError(err) } time.Sleep(time.Second) + currentClientProcesses := make([]*process.Process, 0, len(clientTemplates)) for i := range clientTemplates { index := i + len(serverTemplates) path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-client-%d", i)) @@ -52,11 +55,14 @@ func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []str p := process.NewWithEnvs(TestContext.FRPClientPath, []string{"-c", path}, f.osEnvs) f.clientConfPaths = append(f.clientConfPaths, path) f.clientProcesses = append(f.clientProcesses, p) + currentClientProcesses = append(currentClientProcesses, p) err = p.Start() ExpectNoError(err) time.Sleep(500 * time.Millisecond) } time.Sleep(500 * time.Millisecond) + + return currentServerProcesses, currentClientProcesses } func (f *Framework) RunFrps(args ...string) (*process.Process, string, error) { diff --git a/test/e2e/plugin/server.go b/test/e2e/plugin/server.go index 79ecff4..b972f78 100644 --- a/test/e2e/plugin/server.go +++ b/test/e2e/plugin/server.go @@ -158,6 +158,56 @@ var _ = Describe("[Feature: Server-Plugins]", func() { }) }) + Describe("CloseProxy", func() { + newFunc := func() *plugin.Request { + var r plugin.Request + r.Content = &plugin.CloseProxyContent{} + return &r + } + + It("Validate Info", func() { + localPort := f.AllocPort() + var recordProxyName string + handler := func(req *plugin.Request) *plugin.Response { + var ret plugin.Response + content := req.Content.(*plugin.CloseProxyContent) + recordProxyName = content.ProxyName + return &ret + } + pluginServer := NewHTTPPluginServer(localPort, newFunc, handler, nil) + + f.RunServer("", pluginServer) + + serverConf := consts.DefaultServerConfig + fmt.Sprintf(` + [plugin.test] + addr = 127.0.0.1:%d + path = /handler + ops = CloseProxy + `, localPort) + clientConf := consts.DefaultClientConfig + + remotePort := f.AllocPort() + clientConf += fmt.Sprintf(` + [tcp] + type = tcp + local_port = {{ .%s }} + remote_port = %d + `, framework.TCPEchoServerPort, remotePort) + + _, clients := f.RunProcesses([]string{serverConf}, []string{clientConf}) + + framework.NewRequestExpect(f).Port(remotePort).Ensure() + + for _, c := range clients { + c.Stop() + } + + time.Sleep(1 * time.Second) + + framework.ExpectEqual(recordProxyName, "tcp") + }) + }) + Describe("Ping", func() { newFunc := func() *plugin.Request { var r plugin.Request