frp/client/visitor.go

339 lines
7.8 KiB
Go
Raw Normal View History

2017-06-26 03:02:33 +08:00
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
2017-10-24 22:53:20 +08:00
"bytes"
2017-10-24 18:20:07 +08:00
"fmt"
2017-06-26 03:02:33 +08:00
"io"
2017-10-24 18:20:07 +08:00
"net"
2017-10-24 22:53:20 +08:00
"strconv"
2017-10-24 18:20:07 +08:00
"strings"
2017-06-26 03:02:33 +08:00
"sync"
"time"
2017-10-24 18:20:07 +08:00
"golang.org/x/net/ipv4"
2018-04-10 17:46:49 +08:00
"github.com/fatedier/frp/g"
2017-06-26 03:02:33 +08:00
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
2018-05-08 02:13:30 +08:00
2018-05-08 23:42:04 +08:00
frpIo "github.com/fatedier/golib/io"
2018-05-08 02:13:30 +08:00
"github.com/fatedier/golib/pool"
2017-06-26 03:02:33 +08:00
)
2017-12-05 01:34:33 +08:00
// Visitor is used for forward traffics from local port tot remote service.
type Visitor interface {
2017-06-26 03:02:33 +08:00
Run() error
Close()
log.Logger
}
func NewVisitor(ctl *Control, cfg config.VisitorConf) (visitor Visitor) {
2017-12-05 01:34:33 +08:00
baseVisitor := BaseVisitor{
2017-06-26 03:02:33 +08:00
ctl: ctl,
Logger: log.NewPrefixLogger(cfg.GetBaseInfo().ProxyName),
2017-06-26 03:02:33 +08:00
}
switch cfg := cfg.(type) {
case *config.StcpVisitorConf:
2017-12-05 01:34:33 +08:00
visitor = &StcpVisitor{
2019-01-31 16:49:23 +08:00
BaseVisitor: &baseVisitor,
2017-12-05 01:34:33 +08:00
cfg: cfg,
2017-06-26 03:02:33 +08:00
}
case *config.XtcpVisitorConf:
2017-12-05 01:34:33 +08:00
visitor = &XtcpVisitor{
2019-01-31 16:49:23 +08:00
BaseVisitor: &baseVisitor,
2017-12-05 01:34:33 +08:00
cfg: cfg,
2017-10-24 18:20:07 +08:00
}
2017-06-26 03:02:33 +08:00
}
return
}
2017-12-05 01:34:33 +08:00
type BaseVisitor struct {
2017-06-26 03:02:33 +08:00
ctl *Control
l frpNet.Listener
closed bool
mu sync.RWMutex
log.Logger
}
2017-12-05 01:34:33 +08:00
type StcpVisitor struct {
2019-01-31 16:49:23 +08:00
*BaseVisitor
2017-06-26 03:02:33 +08:00
cfg *config.StcpVisitorConf
2017-06-26 03:02:33 +08:00
}
2017-12-05 01:34:33 +08:00
func (sv *StcpVisitor) Run() (err error) {
sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, sv.cfg.BindPort)
2017-06-26 03:02:33 +08:00
if err != nil {
return
}
go sv.worker()
return
}
2017-12-05 01:34:33 +08:00
func (sv *StcpVisitor) Close() {
2017-06-26 03:02:33 +08:00
sv.l.Close()
}
2017-12-05 01:34:33 +08:00
func (sv *StcpVisitor) worker() {
2017-06-26 03:02:33 +08:00
for {
conn, err := sv.l.Accept()
if err != nil {
sv.Warn("stcp local listener closed")
return
}
go sv.handleConn(conn)
}
}
2017-12-05 01:34:33 +08:00
func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) {
2017-06-26 03:02:33 +08:00
defer userConn.Close()
sv.Debug("get a new stcp user connection")
2017-12-05 01:34:33 +08:00
visitorConn, err := sv.ctl.connectServer()
2017-06-26 03:02:33 +08:00
if err != nil {
return
}
2017-12-05 01:34:33 +08:00
defer visitorConn.Close()
2017-06-26 03:02:33 +08:00
now := time.Now().Unix()
2017-12-05 01:34:33 +08:00
newVisitorConnMsg := &msg.NewVisitorConn{
2017-06-26 03:02:33 +08:00
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
UseEncryption: sv.cfg.UseEncryption,
UseCompression: sv.cfg.UseCompression,
}
2017-12-05 01:34:33 +08:00
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
2017-06-26 03:02:33 +08:00
if err != nil {
2017-12-05 01:34:33 +08:00
sv.Warn("send newVisitorConnMsg to server error: %v", err)
2017-06-26 03:02:33 +08:00
return
}
2017-12-05 01:34:33 +08:00
var newVisitorConnRespMsg msg.NewVisitorConnResp
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
2017-06-26 03:02:33 +08:00
if err != nil {
2017-12-05 01:34:33 +08:00
sv.Warn("get newVisitorConnRespMsg error: %v", err)
2017-06-26 03:02:33 +08:00
return
}
2017-12-05 01:34:33 +08:00
visitorConn.SetReadDeadline(time.Time{})
2017-06-26 03:02:33 +08:00
2017-12-05 01:34:33 +08:00
if newVisitorConnRespMsg.Error != "" {
sv.Warn("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
2017-06-26 03:02:33 +08:00
return
}
var remote io.ReadWriteCloser
2017-12-05 01:34:33 +08:00
remote = visitorConn
2017-06-26 03:02:33 +08:00
if sv.cfg.UseEncryption {
remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
if err != nil {
sv.Error("create encryption stream error: %v", err)
return
}
}
if sv.cfg.UseCompression {
remote = frpIo.WithCompression(remote)
}
frpIo.Join(userConn, remote)
}
2017-10-24 18:20:07 +08:00
2017-12-05 01:34:33 +08:00
type XtcpVisitor struct {
2019-01-31 16:49:23 +08:00
*BaseVisitor
2017-10-24 18:20:07 +08:00
cfg *config.XtcpVisitorConf
2017-10-24 18:20:07 +08:00
}
2017-12-05 01:34:33 +08:00
func (sv *XtcpVisitor) Run() (err error) {
sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, sv.cfg.BindPort)
2017-10-24 18:20:07 +08:00
if err != nil {
return
}
go sv.worker()
return
}
2017-12-05 01:34:33 +08:00
func (sv *XtcpVisitor) Close() {
2017-10-24 18:20:07 +08:00
sv.l.Close()
}
2017-12-05 01:34:33 +08:00
func (sv *XtcpVisitor) worker() {
2017-10-24 18:20:07 +08:00
for {
conn, err := sv.l.Accept()
if err != nil {
2018-06-08 21:27:58 +08:00
sv.Warn("xtcp local listener closed")
2017-10-24 18:20:07 +08:00
return
}
go sv.handleConn(conn)
}
}
2017-12-05 01:34:33 +08:00
func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
2017-10-24 18:20:07 +08:00
defer userConn.Close()
sv.Debug("get a new xtcp user connection")
2018-04-10 17:46:49 +08:00
if g.GlbClientCfg.ServerUdpPort == 0 {
2017-10-24 18:20:07 +08:00
sv.Error("xtcp is not supported by server")
return
}
raddr, err := net.ResolveUDPAddr("udp",
2018-04-10 17:46:49 +08:00
fmt.Sprintf("%s:%d", g.GlbClientCfg.ServerAddr, g.GlbClientCfg.ServerUdpPort))
2018-10-18 13:55:51 +08:00
if err != nil {
sv.Error("resolve server UDP addr error")
return
}
2017-12-05 01:34:33 +08:00
visitorConn, err := net.DialUDP("udp", nil, raddr)
2018-10-18 13:55:51 +08:00
if err != nil {
sv.Warn("dial server udp addr error: %v", err)
return
}
2017-12-05 01:34:33 +08:00
defer visitorConn.Close()
2017-10-24 18:20:07 +08:00
now := time.Now().Unix()
2017-12-05 01:34:33 +08:00
natHoleVisitorMsg := &msg.NatHoleVisitor{
2017-10-24 18:20:07 +08:00
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
}
2017-12-05 01:34:33 +08:00
err = msg.WriteMsg(visitorConn, natHoleVisitorMsg)
2017-10-24 18:20:07 +08:00
if err != nil {
2017-12-05 01:34:33 +08:00
sv.Warn("send natHoleVisitorMsg to server error: %v", err)
2017-10-24 18:20:07 +08:00
return
}
// Wait for client address at most 10 seconds.
2017-10-24 22:53:20 +08:00
var natHoleRespMsg msg.NatHoleResp
2017-12-05 01:34:33 +08:00
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
2017-10-24 22:53:20 +08:00
buf := pool.GetBuf(1024)
2017-12-05 01:34:33 +08:00
n, err := visitorConn.Read(buf)
2017-10-24 22:53:20 +08:00
if err != nil {
sv.Warn("get natHoleRespMsg error: %v", err)
return
}
err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
2017-10-24 18:20:07 +08:00
if err != nil {
sv.Warn("get natHoleRespMsg error: %v", err)
return
}
2017-12-05 01:34:33 +08:00
visitorConn.SetReadDeadline(time.Time{})
2017-10-24 22:53:20 +08:00
pool.PutBuf(buf)
if natHoleRespMsg.Error != "" {
sv.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
return
}
2017-10-24 22:53:20 +08:00
sv.Trace("get natHoleRespMsg, sid [%s], client address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr)
2017-10-24 18:20:07 +08:00
2017-12-05 01:34:33 +08:00
// Close visitorConn, so we can use it's local address.
visitorConn.Close()
2017-10-24 18:20:07 +08:00
2017-10-24 22:53:20 +08:00
// Send detect message.
array := strings.Split(natHoleRespMsg.ClientAddr, ":")
if len(array) <= 1 {
sv.Error("get natHoleResp client address error: %s", natHoleRespMsg.ClientAddr)
2017-10-24 18:20:07 +08:00
return
}
2017-12-05 01:34:33 +08:00
laddr, _ := net.ResolveUDPAddr("udp", visitorConn.LocalAddr().String())
2017-10-24 22:53:20 +08:00
/*
for i := 1000; i < 65000; i++ {
sv.sendDetectMsg(array[0], int64(i), laddr, "a")
}
*/
port, err := strconv.ParseInt(array[1], 10, 64)
if err != nil {
sv.Error("get natHoleResp client address error: %s", natHoleRespMsg.ClientAddr)
return
2017-10-24 18:20:07 +08:00
}
sv.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
2017-10-24 22:53:20 +08:00
sv.Trace("send all detect msg done")
2017-10-24 18:20:07 +08:00
2017-12-05 01:34:33 +08:00
// Listen for visitorConn's address and wait for client connection.
2017-12-05 22:26:53 +08:00
lConn, err := net.ListenUDP("udp", laddr)
if err != nil {
sv.Error("listen on visitorConn's local adress error: %v", err)
return
}
2017-10-24 22:53:20 +08:00
lConn.SetReadDeadline(time.Now().Add(5 * time.Second))
2017-10-24 18:20:07 +08:00
sidBuf := pool.GetBuf(1024)
2017-10-24 22:53:20 +08:00
n, _, err = lConn.ReadFromUDP(sidBuf)
2017-10-24 18:20:07 +08:00
if err != nil {
sv.Warn("get sid from client error: %v", err)
return
}
lConn.SetReadDeadline(time.Time{})
2017-10-24 22:53:20 +08:00
if string(sidBuf[:n]) != natHoleRespMsg.Sid {
2017-10-24 18:20:07 +08:00
sv.Warn("incorrect sid from client")
return
}
2017-10-24 22:53:20 +08:00
sv.Info("nat hole connection make success, sid [%s]", string(sidBuf[:n]))
2017-10-24 18:20:07 +08:00
pool.PutBuf(sidBuf)
var remote io.ReadWriteCloser
2017-10-24 22:53:20 +08:00
remote, err = frpNet.NewKcpConnFromUdp(lConn, false, natHoleRespMsg.ClientAddr)
2017-10-24 18:20:07 +08:00
if err != nil {
sv.Error("create kcp connection from udp connection error: %v", err)
return
}
if sv.cfg.UseEncryption {
remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
if err != nil {
sv.Error("create encryption stream error: %v", err)
return
}
}
if sv.cfg.UseCompression {
remote = frpIo.WithCompression(remote)
}
frpIo.Join(userConn, remote)
2017-10-25 02:49:56 +08:00
sv.Debug("join connections closed")
2017-10-24 18:20:07 +08:00
}
func (sv *XtcpVisitor) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
2017-10-24 18:20:07 +08:00
daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))
if err != nil {
return err
}
tConn, err := net.DialUDP("udp", laddr, daddr)
if err != nil {
return err
}
uConn := ipv4.NewConn(tConn)
uConn.SetTTL(3)
2017-10-24 22:53:20 +08:00
tConn.Write(content)
2017-10-24 18:20:07 +08:00
tConn.Close()
return nil
}