frp/pkg/proto/udp/udp.go

138 lines
3.0 KiB
Go
Raw Normal View History

2017-03-13 02:44:47 +08:00
// Copyright 2017 fatedier, fatedier@gmail.com
2016-12-19 01:22:21 +08:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-03-09 02:03:47 +08:00
package udp
2016-12-19 01:22:21 +08:00
import (
"encoding/base64"
"net"
2017-03-13 02:44:47 +08:00
"sync"
"time"
2016-12-19 01:22:21 +08:00
2020-09-23 13:49:14 +08:00
"github.com/fatedier/frp/pkg/msg"
2018-05-08 02:13:30 +08:00
"github.com/fatedier/golib/errors"
"github.com/fatedier/golib/pool"
2017-03-13 02:44:47 +08:00
)
2016-12-19 01:22:21 +08:00
2020-05-24 17:48:37 +08:00
func NewUDPPacket(buf []byte, laddr, raddr *net.UDPAddr) *msg.UDPPacket {
return &msg.UDPPacket{
2017-03-13 02:44:47 +08:00
Content: base64.StdEncoding.EncodeToString(buf),
LocalAddr: laddr,
RemoteAddr: raddr,
}
2016-12-19 01:22:21 +08:00
}
2020-05-24 17:48:37 +08:00
func GetContent(m *msg.UDPPacket) (buf []byte, err error) {
2017-03-13 02:44:47 +08:00
buf, err = base64.StdEncoding.DecodeString(m.Content)
return
2016-12-19 01:22:21 +08:00
}
2020-05-24 17:48:37 +08:00
func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UDPPacket, sendCh chan<- *msg.UDPPacket, bufSize int) {
2017-03-13 02:44:47 +08:00
// read
go func() {
for udpMsg := range readCh {
buf, err := GetContent(udpMsg)
if err != nil {
continue
}
udpConn.WriteToUDP(buf, udpMsg.RemoteAddr)
}
}()
// write
buf := pool.GetBuf(bufSize)
2017-04-25 00:34:14 +08:00
defer pool.PutBuf(buf)
for {
n, remoteAddr, err := udpConn.ReadFromUDP(buf)
if err != nil {
return
2017-03-13 02:44:47 +08:00
}
2017-04-25 00:34:14 +08:00
// buf[:n] will be encoded to string, so the bytes can be reused
2020-05-24 17:48:37 +08:00
udpMsg := NewUDPPacket(buf[:n], nil, remoteAddr)
2020-04-22 21:37:45 +08:00
2017-04-25 00:34:14 +08:00
select {
case sendCh <- udpMsg:
default:
}
}
2016-12-19 01:22:21 +08:00
}
2020-05-24 17:48:37 +08:00
func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UDPPacket, sendCh chan<- msg.Message, bufSize int) {
2017-03-13 02:44:47 +08:00
var (
mu sync.RWMutex
)
udpConnMap := make(map[string]*net.UDPConn)
2016-12-19 01:22:21 +08:00
2017-03-13 02:44:47 +08:00
// read from dstAddr and write to sendCh
writerFn := func(raddr *net.UDPAddr, udpConn *net.UDPConn) {
addr := raddr.String()
defer func() {
mu.Lock()
delete(udpConnMap, addr)
mu.Unlock()
2018-03-19 20:22:15 +08:00
udpConn.Close()
2017-03-13 02:44:47 +08:00
}()
2016-12-19 01:22:21 +08:00
buf := pool.GetBuf(bufSize)
2017-03-13 02:44:47 +08:00
for {
udpConn.SetReadDeadline(time.Now().Add(30 * time.Second))
n, _, err := udpConn.ReadFromUDP(buf)
if err != nil {
return
}
2016-12-19 01:22:21 +08:00
2020-05-24 17:48:37 +08:00
udpMsg := NewUDPPacket(buf[:n], nil, raddr)
2017-03-13 02:44:47 +08:00
if err = errors.PanicToError(func() {
select {
case sendCh <- udpMsg:
default:
}
}); err != nil {
return
}
}
2016-12-19 01:22:21 +08:00
}
2017-03-13 02:44:47 +08:00
// read from readCh
go func() {
for udpMsg := range readCh {
buf, err := GetContent(udpMsg)
if err != nil {
continue
}
mu.Lock()
udpConn, ok := udpConnMap[udpMsg.RemoteAddr.String()]
if !ok {
udpConn, err = net.DialUDP("udp", nil, dstAddr)
if err != nil {
mu.Unlock()
2017-03-13 02:44:47 +08:00
continue
}
udpConnMap[udpMsg.RemoteAddr.String()] = udpConn
}
mu.Unlock()
_, err = udpConn.Write(buf)
if err != nil {
udpConn.Close()
}
if !ok {
go writerFn(udpMsg.RemoteAddr, udpConn)
}
}
}()
2016-12-19 01:22:21 +08:00
}