From 738e5dad224263c5b3f6fb0214faf9c094497af2 Mon Sep 17 00:00:00 2001 From: fatedier Date: Sun, 21 May 2017 22:42:42 +0800 Subject: [PATCH] new feature plugin and unix domian socket plugin --- Dockerfile | 2 +- Makefile | 2 +- client/control.go | 2 +- client/proxy.go | 79 ++++++++++++++++++++++------- cmd/frpc/main.go | 2 +- conf/frpc.ini | 9 ++++ models/config/proxy.go | 32 +++++++++++- models/plugin/plugin.go | 45 ++++++++++++++++ models/plugin/unix_domain_socket.go | 69 +++++++++++++++++++++++++ tests/conf/auto_test_frpc.ini | 6 +++ tests/echo_server.go | 30 +++++++++-- tests/func_test.go | 22 ++++++++ 12 files changed, 274 insertions(+), 26 deletions(-) create mode 100644 models/plugin/plugin.go create mode 100644 models/plugin/unix_domain_socket.go diff --git a/Dockerfile b/Dockerfile index 934e991..2f22088 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.6 +FROM golang:1.8 COPY . /go/src/github.com/fatedier/frp diff --git a/Makefile b/Makefile index 2d3ce7e..31b4012 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ alltest: gotest clean: rm -f ./bin/frpc rm -f ./bin/frps - cd ./test && ./clean_test.sh && cd - + cd ./tests && ./clean_test.sh && cd - save: godep save ./... diff --git a/client/control.go b/client/control.go index 9e33079..a132eba 100644 --- a/client/control.go +++ b/client/control.go @@ -166,7 +166,7 @@ func (ctl *Control) NewWorkConn() { // dispatch this work connection to related proxy if pxy, ok := ctl.proxies[startMsg.ProxyName]; ok { - workConn.Info("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) + workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) go pxy.InWorkConn(workConn) } else { workConn.Close() diff --git a/client/proxy.go b/client/proxy.go index 818aebc..4e1d020 100644 --- a/client/proxy.go +++ b/client/proxy.go @@ -23,6 +23,7 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/models/plugin" "github.com/fatedier/frp/models/proto/tcp" "github.com/fatedier/frp/models/proto/udp" "github.com/fatedier/frp/utils/errors" @@ -81,57 +82,87 @@ type BaseProxy struct { type TcpProxy struct { BaseProxy - cfg *config.TcpProxyConf + cfg *config.TcpProxyConf + proxyPlugin plugin.Plugin } func (pxy *TcpProxy) Run() (err error) { + if pxy.cfg.Plugin != "" { + pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams) + if err != nil { + return + } + } return } func (pxy *TcpProxy) Close() { + if pxy.proxyPlugin != nil { + pxy.proxyPlugin.Close() + } } func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn) { defer conn.Close() - HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, &pxy.cfg.BaseProxyConf, conn) + HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn) } // HTTP type HttpProxy struct { BaseProxy - cfg *config.HttpProxyConf + cfg *config.HttpProxyConf + proxyPlugin plugin.Plugin } func (pxy *HttpProxy) Run() (err error) { + if pxy.cfg.Plugin != "" { + pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams) + if err != nil { + return + } + } return } func (pxy *HttpProxy) Close() { + if pxy.proxyPlugin != nil { + pxy.proxyPlugin.Close() + } } func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn) { defer conn.Close() - HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, &pxy.cfg.BaseProxyConf, conn) + HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn) } // HTTPS type HttpsProxy struct { BaseProxy - cfg *config.HttpsProxyConf + cfg *config.HttpsProxyConf + proxyPlugin plugin.Plugin } func (pxy *HttpsProxy) Run() (err error) { + if pxy.cfg.Plugin != "" { + pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams) + if err != nil { + return + } + } return } func (pxy *HttpsProxy) Close() { + if pxy.proxyPlugin != nil { + pxy.proxyPlugin.Close() + } } func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) { defer conn.Close() - HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, &pxy.cfg.BaseProxyConf, conn) + HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn) } // UDP @@ -240,14 +271,13 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { } // Common handler for tcp work connections. -func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, baseInfo *config.BaseProxyConf, workConn frpNet.Conn) { - localConn, err := frpNet.ConnectTcpServer(fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort)) - if err != nil { - workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err) - return - } +func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, + baseInfo *config.BaseProxyConf, workConn frpNet.Conn) { - var remote io.ReadWriteCloser + var ( + remote io.ReadWriteCloser + err error + ) remote = workConn if baseInfo.UseEncryption { remote, err = tcp.WithEncryption(remote, []byte(config.ClientCommonCfg.PrivilegeToken)) @@ -259,8 +289,23 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, baseInfo *config.Ba if baseInfo.UseCompression { remote = tcp.WithCompression(remote) } - workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), - localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String()) - tcp.Join(localConn, remote) - workConn.Debug("join connections closed") + + if proxyPlugin != nil { + // if plugin is set, let plugin handle connections first + workConn.Debug("handle by plugin: %s", proxyPlugin.Name()) + proxyPlugin.Handle(remote) + workConn.Debug("handle by plugin finished") + return + } else { + localConn, err := frpNet.ConnectTcpServer(fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort)) + if err != nil { + workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err) + return + } + + workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), + localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String()) + tcp.Join(localConn, remote) + workConn.Debug("join connections closed") + } } diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index b86e09f..4f816a9 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -46,7 +46,7 @@ Options: --log-level= set log level: debug, info, warn, error --server-addr= addr which frps is listening for, example: 0.0.0.0:7000 -h --help show this screen - --version show version + -v --version show version ` func main() { diff --git a/conf/frpc.ini b/conf/frpc.ini index 6c620ec..8546eae 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -81,3 +81,12 @@ use_encryption = false use_compression = false subdomain = web01 custom_domains = web02.yourdomain.com + +[unix_domain] +type = tcp +remote_port = 6001 +# if plugin is defined, local_ip and local_port is useless +# plugin will handle connections got from frps +plugin = unix_domain_socket +# params set with prefix "plugin_" that plugin needed +plugin_unix_path = /var/run/docker.sock diff --git a/models/config/proxy.go b/models/config/proxy.go index 7b54feb..71fb67b 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -239,6 +239,7 @@ func (cfg *DomainConf) check() (err error) { return nil } +// Local service info type LocalSvrConf struct { LocalIp string `json:"-"` LocalPort int `json:"-"` @@ -259,12 +260,34 @@ func (cfg *LocalSvrConf) LoadFromFile(name string, section ini.Section) (err err return nil } +type PluginConf struct { + Plugin string `json:"-"` + PluginParams map[string]string `json:"-"` +} + +func (cfg *PluginConf) LoadFromFile(name string, section ini.Section) (err error) { + cfg.Plugin = section["plugin"] + cfg.PluginParams = make(map[string]string) + if cfg.Plugin != "" { + // get params begin with "plugin_" + for k, v := range section { + if strings.HasPrefix(k, "plugin_") { + cfg.PluginParams[k] = v + } + } + } else { + return fmt.Errorf("Parse conf error: proxy [%s] no plugin info found", name) + } + return +} + // TCP type TcpProxyConf struct { BaseProxyConf BindInfoConf LocalSvrConf + PluginConf } func (cfg *TcpProxyConf) LoadFromMsg(pMsg *msg.NewProxy) { @@ -279,8 +302,11 @@ func (cfg *TcpProxyConf) LoadFromFile(name string, section ini.Section) (err err if err = cfg.BindInfoConf.LoadFromFile(name, section); err != nil { return } - if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { - return + + if err = cfg.PluginConf.LoadFromFile(name, section); err != nil { + if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { + return + } } return } @@ -337,6 +363,7 @@ type HttpProxyConf struct { DomainConf LocalSvrConf + PluginConf Locations []string `json:"locations"` HostHeaderRewrite string `json:"host_header_rewrite"` @@ -405,6 +432,7 @@ type HttpsProxyConf struct { DomainConf LocalSvrConf + PluginConf } func (cfg *HttpsProxyConf) LoadFromMsg(pMsg *msg.NewProxy) { diff --git a/models/plugin/plugin.go b/models/plugin/plugin.go new file mode 100644 index 0000000..0c60461 --- /dev/null +++ b/models/plugin/plugin.go @@ -0,0 +1,45 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package plugin + +import ( + "fmt" + "io" +) + +// Creators is used for create plugins to handle connections. +var creators = make(map[string]CreatorFn) + +// params has prefix "plugin_" +type CreatorFn func(params map[string]string) (Plugin, error) + +func Register(name string, fn CreatorFn) { + creators[name] = fn +} + +func Create(name string, params map[string]string) (p Plugin, err error) { + if fn, ok := creators[name]; ok { + p, err = fn(params) + } else { + err = fmt.Errorf("plugin [%s] is not registered", name) + } + return +} + +type Plugin interface { + Name() string + Handle(conn io.ReadWriteCloser) + Close() error +} diff --git a/models/plugin/unix_domain_socket.go b/models/plugin/unix_domain_socket.go new file mode 100644 index 0000000..5fd8360 --- /dev/null +++ b/models/plugin/unix_domain_socket.go @@ -0,0 +1,69 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package plugin + +import ( + "fmt" + "io" + "net" + + "github.com/fatedier/frp/models/proto/tcp" +) + +const PluginUnixDomainSocket = "unix_domain_socket" + +func init() { + Register(PluginUnixDomainSocket, NewUnixDomainSocketPlugin) +} + +type UnixDomainSocketPlugin struct { + UnixAddr *net.UnixAddr +} + +func NewUnixDomainSocketPlugin(params map[string]string) (p Plugin, err error) { + unixPath, ok := params["plugin_unix_path"] + if !ok { + err = fmt.Errorf("plugin_unix_path not found") + return + } + + unixAddr, errRet := net.ResolveUnixAddr("unix", unixPath) + if errRet != nil { + err = errRet + return + } + + p = &UnixDomainSocketPlugin{ + UnixAddr: unixAddr, + } + return +} + +func (uds *UnixDomainSocketPlugin) Handle(conn io.ReadWriteCloser) { + localConn, err := net.DialUnix("unix", nil, uds.UnixAddr) + if err != nil { + return + } + + tcp.Join(localConn, conn) +} + +func (uds *UnixDomainSocketPlugin) Name() string { + return PluginUnixDomainSocket +} + +func (uds *UnixDomainSocketPlugin) Close() error { + return nil +} diff --git a/tests/conf/auto_test_frpc.ini b/tests/conf/auto_test_frpc.ini index 8bdc44b..d21f01f 100644 --- a/tests/conf/auto_test_frpc.ini +++ b/tests/conf/auto_test_frpc.ini @@ -27,3 +27,9 @@ type = udp local_ip = 127.0.0.1 local_port = 10703 remote_port = 10712 + +[unix_domain] +type = tcp +remote_port = 10704 +plugin = unix_domain_socket +plugin_unix_path = /tmp/frp_echo_server.sock diff --git a/tests/echo_server.go b/tests/echo_server.go index 82df23c..391c87e 100644 --- a/tests/echo_server.go +++ b/tests/echo_server.go @@ -4,12 +4,15 @@ import ( "bufio" "fmt" "io" + "net" + "os" + "syscall" - "github.com/fatedier/frp/utils/net" + frpNet "github.com/fatedier/frp/utils/net" ) func StartEchoServer() { - l, err := net.ListenTcp("127.0.0.1", 10701) + l, err := frpNet.ListenTcp("127.0.0.1", 10701) if err != nil { fmt.Printf("echo server listen error: %v\n", err) return @@ -27,7 +30,7 @@ func StartEchoServer() { } func StartUdpEchoServer() { - l, err := net.ListenUDP("127.0.0.1", 10703) + l, err := frpNet.ListenUDP("127.0.0.1", 10703) if err != nil { fmt.Printf("udp echo server listen error: %v\n", err) return @@ -44,6 +47,27 @@ func StartUdpEchoServer() { } } +func StartUnixDomainServer() { + unixPath := "/tmp/frp_echo_server.sock" + os.Remove(unixPath) + syscall.Umask(0) + l, err := net.Listen("unix", unixPath) + if err != nil { + fmt.Printf("unix domain server listen error: %v\n", err) + return + } + + for { + c, err := l.Accept() + if err != nil { + fmt.Printf("unix domain server accept error: %v\n", err) + return + } + + go echoWorker(c) + } +} + func echoWorker(c net.Conn) { br := bufio.NewReader(c) for { diff --git a/tests/func_test.go b/tests/func_test.go index b2ad764..444e673 100644 --- a/tests/func_test.go +++ b/tests/func_test.go @@ -26,6 +26,7 @@ func init() { go StartEchoServer() go StartUdpEchoServer() go StartHttpServer() + go StartUnixDomainServer() time.Sleep(500 * time.Millisecond) } @@ -95,3 +96,24 @@ func TestUdpEchoServer(t *testing.T) { t.Fatalf("message got from udp server error, get %s", string(data[:n-1])) } } + +func TestUnixDomainServer(t *testing.T) { + c, err := frpNet.ConnectTcpServer(fmt.Sprintf("127.0.0.1:%d", 10704)) + if err != nil { + t.Fatalf("connect to echo server error: %v", err) + } + timer := time.Now().Add(time.Duration(5) * time.Second) + c.SetDeadline(timer) + + c.Write([]byte(ECHO_TEST_STR + "\n")) + + br := bufio.NewReader(c) + buf, err := br.ReadString('\n') + if err != nil { + t.Fatalf("read from echo server error: %v", err) + } + + if ECHO_TEST_STR != buf { + t.Fatalf("content error, send [%s], get [%s]", strings.Trim(ECHO_TEST_STR, "\n"), strings.Trim(buf, "\n")) + } +}