new feature plugin and unix domian socket plugin

This commit is contained in:
fatedier 2017-05-21 22:42:42 +08:00
parent faf584e1dd
commit 738e5dad22
12 changed files with 274 additions and 26 deletions

View File

@ -1,4 +1,4 @@
FROM golang:1.6 FROM golang:1.8
COPY . /go/src/github.com/fatedier/frp COPY . /go/src/github.com/fatedier/frp

View File

@ -42,7 +42,7 @@ alltest: gotest
clean: clean:
rm -f ./bin/frpc rm -f ./bin/frpc
rm -f ./bin/frps rm -f ./bin/frps
cd ./test && ./clean_test.sh && cd - cd ./tests && ./clean_test.sh && cd -
save: save:
godep save ./... godep save ./...

View File

@ -166,7 +166,7 @@ func (ctl *Control) NewWorkConn() {
// dispatch this work connection to related proxy // dispatch this work connection to related proxy
if pxy, ok := ctl.proxies[startMsg.ProxyName]; ok { if pxy, ok := ctl.proxies[startMsg.ProxyName]; ok {
workConn.Info("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
go pxy.InWorkConn(workConn) go pxy.InWorkConn(workConn)
} else { } else {
workConn.Close() workConn.Close()

View File

@ -23,6 +23,7 @@ import (
"github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/plugin"
"github.com/fatedier/frp/models/proto/tcp" "github.com/fatedier/frp/models/proto/tcp"
"github.com/fatedier/frp/models/proto/udp" "github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/utils/errors" "github.com/fatedier/frp/utils/errors"
@ -82,18 +83,28 @@ type TcpProxy struct {
BaseProxy BaseProxy
cfg *config.TcpProxyConf cfg *config.TcpProxyConf
proxyPlugin plugin.Plugin
} }
func (pxy *TcpProxy) Run() (err error) { func (pxy *TcpProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return return
} }
func (pxy *TcpProxy) Close() { func (pxy *TcpProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
} }
func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn) { func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn) {
defer conn.Close() defer conn.Close()
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, &pxy.cfg.BaseProxyConf, conn) HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn)
} }
// HTTP // HTTP
@ -101,18 +112,28 @@ type HttpProxy struct {
BaseProxy BaseProxy
cfg *config.HttpProxyConf cfg *config.HttpProxyConf
proxyPlugin plugin.Plugin
} }
func (pxy *HttpProxy) Run() (err error) { func (pxy *HttpProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return return
} }
func (pxy *HttpProxy) Close() { func (pxy *HttpProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
} }
func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn) { func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn) {
defer conn.Close() defer conn.Close()
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, &pxy.cfg.BaseProxyConf, conn) HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn)
} }
// HTTPS // HTTPS
@ -120,18 +141,28 @@ type HttpsProxy struct {
BaseProxy BaseProxy
cfg *config.HttpsProxyConf cfg *config.HttpsProxyConf
proxyPlugin plugin.Plugin
} }
func (pxy *HttpsProxy) Run() (err error) { func (pxy *HttpsProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return return
} }
func (pxy *HttpsProxy) Close() { func (pxy *HttpsProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
} }
func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) { func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) {
defer conn.Close() defer conn.Close()
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, &pxy.cfg.BaseProxyConf, conn) HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn)
} }
// UDP // UDP
@ -240,14 +271,13 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
} }
// Common handler for tcp work connections. // Common handler for tcp work connections.
func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, baseInfo *config.BaseProxyConf, workConn frpNet.Conn) { func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
localConn, err := frpNet.ConnectTcpServer(fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort)) baseInfo *config.BaseProxyConf, workConn frpNet.Conn) {
if err != nil {
workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
return
}
var remote io.ReadWriteCloser var (
remote io.ReadWriteCloser
err error
)
remote = workConn remote = workConn
if baseInfo.UseEncryption { if baseInfo.UseEncryption {
remote, err = tcp.WithEncryption(remote, []byte(config.ClientCommonCfg.PrivilegeToken)) remote, err = tcp.WithEncryption(remote, []byte(config.ClientCommonCfg.PrivilegeToken))
@ -259,8 +289,23 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, baseInfo *config.Ba
if baseInfo.UseCompression { if baseInfo.UseCompression {
remote = tcp.WithCompression(remote) remote = tcp.WithCompression(remote)
} }
if proxyPlugin != nil {
// if plugin is set, let plugin handle connections first
workConn.Debug("handle by plugin: %s", proxyPlugin.Name())
proxyPlugin.Handle(remote)
workConn.Debug("handle by plugin finished")
return
} else {
localConn, err := frpNet.ConnectTcpServer(fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
if err != nil {
workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
return
}
workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String()) localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
tcp.Join(localConn, remote) tcp.Join(localConn, remote)
workConn.Debug("join connections closed") workConn.Debug("join connections closed")
}
} }

View File

@ -46,7 +46,7 @@ Options:
--log-level=<log_level> set log level: debug, info, warn, error --log-level=<log_level> set log level: debug, info, warn, error
--server-addr=<server_addr> addr which frps is listening for, example: 0.0.0.0:7000 --server-addr=<server_addr> addr which frps is listening for, example: 0.0.0.0:7000
-h --help show this screen -h --help show this screen
--version show version -v --version show version
` `
func main() { func main() {

View File

@ -81,3 +81,12 @@ use_encryption = false
use_compression = false use_compression = false
subdomain = web01 subdomain = web01
custom_domains = web02.yourdomain.com custom_domains = web02.yourdomain.com
[unix_domain]
type = tcp
remote_port = 6001
# if plugin is defined, local_ip and local_port is useless
# plugin will handle connections got from frps
plugin = unix_domain_socket
# params set with prefix "plugin_" that plugin needed
plugin_unix_path = /var/run/docker.sock

View File

@ -239,6 +239,7 @@ func (cfg *DomainConf) check() (err error) {
return nil return nil
} }
// Local service info
type LocalSvrConf struct { type LocalSvrConf struct {
LocalIp string `json:"-"` LocalIp string `json:"-"`
LocalPort int `json:"-"` LocalPort int `json:"-"`
@ -259,12 +260,34 @@ func (cfg *LocalSvrConf) LoadFromFile(name string, section ini.Section) (err err
return nil return nil
} }
type PluginConf struct {
Plugin string `json:"-"`
PluginParams map[string]string `json:"-"`
}
func (cfg *PluginConf) LoadFromFile(name string, section ini.Section) (err error) {
cfg.Plugin = section["plugin"]
cfg.PluginParams = make(map[string]string)
if cfg.Plugin != "" {
// get params begin with "plugin_"
for k, v := range section {
if strings.HasPrefix(k, "plugin_") {
cfg.PluginParams[k] = v
}
}
} else {
return fmt.Errorf("Parse conf error: proxy [%s] no plugin info found", name)
}
return
}
// TCP // TCP
type TcpProxyConf struct { type TcpProxyConf struct {
BaseProxyConf BaseProxyConf
BindInfoConf BindInfoConf
LocalSvrConf LocalSvrConf
PluginConf
} }
func (cfg *TcpProxyConf) LoadFromMsg(pMsg *msg.NewProxy) { func (cfg *TcpProxyConf) LoadFromMsg(pMsg *msg.NewProxy) {
@ -279,9 +302,12 @@ func (cfg *TcpProxyConf) LoadFromFile(name string, section ini.Section) (err err
if err = cfg.BindInfoConf.LoadFromFile(name, section); err != nil { if err = cfg.BindInfoConf.LoadFromFile(name, section); err != nil {
return return
} }
if err = cfg.PluginConf.LoadFromFile(name, section); err != nil {
if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil {
return return
} }
}
return return
} }
@ -337,6 +363,7 @@ type HttpProxyConf struct {
DomainConf DomainConf
LocalSvrConf LocalSvrConf
PluginConf
Locations []string `json:"locations"` Locations []string `json:"locations"`
HostHeaderRewrite string `json:"host_header_rewrite"` HostHeaderRewrite string `json:"host_header_rewrite"`
@ -405,6 +432,7 @@ type HttpsProxyConf struct {
DomainConf DomainConf
LocalSvrConf LocalSvrConf
PluginConf
} }
func (cfg *HttpsProxyConf) LoadFromMsg(pMsg *msg.NewProxy) { func (cfg *HttpsProxyConf) LoadFromMsg(pMsg *msg.NewProxy) {

45
models/plugin/plugin.go Normal file
View File

@ -0,0 +1,45 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package plugin
import (
"fmt"
"io"
)
// Creators is used for create plugins to handle connections.
var creators = make(map[string]CreatorFn)
// params has prefix "plugin_"
type CreatorFn func(params map[string]string) (Plugin, error)
func Register(name string, fn CreatorFn) {
creators[name] = fn
}
func Create(name string, params map[string]string) (p Plugin, err error) {
if fn, ok := creators[name]; ok {
p, err = fn(params)
} else {
err = fmt.Errorf("plugin [%s] is not registered", name)
}
return
}
type Plugin interface {
Name() string
Handle(conn io.ReadWriteCloser)
Close() error
}

View File

@ -0,0 +1,69 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package plugin
import (
"fmt"
"io"
"net"
"github.com/fatedier/frp/models/proto/tcp"
)
const PluginUnixDomainSocket = "unix_domain_socket"
func init() {
Register(PluginUnixDomainSocket, NewUnixDomainSocketPlugin)
}
type UnixDomainSocketPlugin struct {
UnixAddr *net.UnixAddr
}
func NewUnixDomainSocketPlugin(params map[string]string) (p Plugin, err error) {
unixPath, ok := params["plugin_unix_path"]
if !ok {
err = fmt.Errorf("plugin_unix_path not found")
return
}
unixAddr, errRet := net.ResolveUnixAddr("unix", unixPath)
if errRet != nil {
err = errRet
return
}
p = &UnixDomainSocketPlugin{
UnixAddr: unixAddr,
}
return
}
func (uds *UnixDomainSocketPlugin) Handle(conn io.ReadWriteCloser) {
localConn, err := net.DialUnix("unix", nil, uds.UnixAddr)
if err != nil {
return
}
tcp.Join(localConn, conn)
}
func (uds *UnixDomainSocketPlugin) Name() string {
return PluginUnixDomainSocket
}
func (uds *UnixDomainSocketPlugin) Close() error {
return nil
}

View File

@ -27,3 +27,9 @@ type = udp
local_ip = 127.0.0.1 local_ip = 127.0.0.1
local_port = 10703 local_port = 10703
remote_port = 10712 remote_port = 10712
[unix_domain]
type = tcp
remote_port = 10704
plugin = unix_domain_socket
plugin_unix_path = /tmp/frp_echo_server.sock

View File

@ -4,12 +4,15 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"io" "io"
"net"
"os"
"syscall"
"github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net"
) )
func StartEchoServer() { func StartEchoServer() {
l, err := net.ListenTcp("127.0.0.1", 10701) l, err := frpNet.ListenTcp("127.0.0.1", 10701)
if err != nil { if err != nil {
fmt.Printf("echo server listen error: %v\n", err) fmt.Printf("echo server listen error: %v\n", err)
return return
@ -27,7 +30,7 @@ func StartEchoServer() {
} }
func StartUdpEchoServer() { func StartUdpEchoServer() {
l, err := net.ListenUDP("127.0.0.1", 10703) l, err := frpNet.ListenUDP("127.0.0.1", 10703)
if err != nil { if err != nil {
fmt.Printf("udp echo server listen error: %v\n", err) fmt.Printf("udp echo server listen error: %v\n", err)
return return
@ -44,6 +47,27 @@ func StartUdpEchoServer() {
} }
} }
func StartUnixDomainServer() {
unixPath := "/tmp/frp_echo_server.sock"
os.Remove(unixPath)
syscall.Umask(0)
l, err := net.Listen("unix", unixPath)
if err != nil {
fmt.Printf("unix domain server listen error: %v\n", err)
return
}
for {
c, err := l.Accept()
if err != nil {
fmt.Printf("unix domain server accept error: %v\n", err)
return
}
go echoWorker(c)
}
}
func echoWorker(c net.Conn) { func echoWorker(c net.Conn) {
br := bufio.NewReader(c) br := bufio.NewReader(c)
for { for {

View File

@ -26,6 +26,7 @@ func init() {
go StartEchoServer() go StartEchoServer()
go StartUdpEchoServer() go StartUdpEchoServer()
go StartHttpServer() go StartHttpServer()
go StartUnixDomainServer()
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
} }
@ -95,3 +96,24 @@ func TestUdpEchoServer(t *testing.T) {
t.Fatalf("message got from udp server error, get %s", string(data[:n-1])) t.Fatalf("message got from udp server error, get %s", string(data[:n-1]))
} }
} }
func TestUnixDomainServer(t *testing.T) {
c, err := frpNet.ConnectTcpServer(fmt.Sprintf("127.0.0.1:%d", 10704))
if err != nil {
t.Fatalf("connect to echo server error: %v", err)
}
timer := time.Now().Add(time.Duration(5) * time.Second)
c.SetDeadline(timer)
c.Write([]byte(ECHO_TEST_STR + "\n"))
br := bufio.NewReader(c)
buf, err := br.ReadString('\n')
if err != nil {
t.Fatalf("read from echo server error: %v", err)
}
if ECHO_TEST_STR != buf {
t.Fatalf("content error, send [%s], get [%s]", strings.Trim(ECHO_TEST_STR, "\n"), strings.Trim(buf, "\n"))
}
}