mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Add a TCP proxy for debugging. v6.0.117 (#3958)
When debugging the RTMP protocol, we can capture packets using tcpdump and then replay the pcap file. For example: ```bash cd ~/git/srs/trunk/3rdparty/srs-bench/pcap tcpdump -i any -w t.pcap tcp port 1935 go run . -f ./t.pcap -s 127.0.0.1:1935 ``` However, sometimes due to poor network conditions between the server and the client, there may be many retransmitted packets. In such cases, setting up a transparent TCP proxy that listens on port 1935 and forwards to port 19350 can be a solution: ```bash ./objs/srs -c conf/origin.conf cd 3rdparty/srs-bench/tcpproxy/ && go run main.go tcpdump -i any -w t.pcap tcp port 19350 ``` This approach allows for the implementation of packet dumping, multipoint replication, or the provision of detailed timestamps and byte information at the proxy. It enables the collection of debugging information without the need to modify the server. --------- `TRANS_BY_GPT4` --------- Co-authored-by: john <hondaxiao@tencent.com>
This commit is contained in:
parent
26f4ab9923
commit
ce2ce1542f
4 changed files with 176 additions and 6 deletions
20
trunk/3rdparty/srs-bench/pcap/main.go
vendored
20
trunk/3rdparty/srs-bench/pcap/main.go
vendored
|
@ -7,6 +7,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/gopacket"
|
"github.com/google/gopacket"
|
||||||
|
@ -62,9 +63,19 @@ func doMain(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
r, err := pcapgo.NewNgReader(f, pcapgo.DefaultNgReaderOptions)
|
var source *gopacket.PacketSource
|
||||||
if err != nil {
|
if strings.HasSuffix(filename, ".pcap") {
|
||||||
return errors.Wrapf(err, "new reader")
|
r, err := pcapgo.NewReader(f)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "new reader")
|
||||||
|
}
|
||||||
|
source = gopacket.NewPacketSource(r, r.LinkType())
|
||||||
|
} else {
|
||||||
|
r, err := pcapgo.NewNgReader(f, pcapgo.DefaultNgReaderOptions)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "new reader")
|
||||||
|
}
|
||||||
|
source = gopacket.NewPacketSource(r, r.LinkType())
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: FIXME: Should start a goroutine to consume bytes from conn.
|
// TODO: FIXME: Should start a goroutine to consume bytes from conn.
|
||||||
|
@ -76,7 +87,6 @@ func doMain(ctx context.Context) error {
|
||||||
|
|
||||||
var packetNumber uint64
|
var packetNumber uint64
|
||||||
var previousTime *time.Time
|
var previousTime *time.Time
|
||||||
source := gopacket.NewPacketSource(r, r.LinkType())
|
|
||||||
for packet := range source.Packets() {
|
for packet := range source.Packets() {
|
||||||
packetNumber++
|
packetNumber++
|
||||||
|
|
||||||
|
@ -90,7 +100,7 @@ func doMain(ctx context.Context) error {
|
||||||
if len(payload) == 0 {
|
if len(payload) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if tcp.DstPort != 1935 {
|
if tcp.DstPort != 1935 && tcp.DstPort != 19350 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
159
trunk/3rdparty/srs-bench/tcpproxy/main.go
vendored
Normal file
159
trunk/3rdparty/srs-bench/tcpproxy/main.go
vendored
Normal file
|
@ -0,0 +1,159 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if err := doMain(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func doMain() error {
|
||||||
|
hashID := buildHashID()
|
||||||
|
|
||||||
|
listener, err := net.Listen("tcp", ":1935")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
trace(hashID, "Listen at %v", listener.Addr())
|
||||||
|
|
||||||
|
for {
|
||||||
|
client, err := listener.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
backend, err := net.Dial("tcp", "localhost:19350")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go serve(client, backend)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serve(client, backend net.Conn) {
|
||||||
|
defer client.Close()
|
||||||
|
defer backend.Close()
|
||||||
|
hashID := buildHashID()
|
||||||
|
if err := doServe(hashID, client, backend); err != nil {
|
||||||
|
trace(hashID, "Serve error %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func doServe(hashID string, client, backend net.Conn) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var r0 error
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if c, ok := client.(*net.TCPConn); ok {
|
||||||
|
c.SetNoDelay(true)
|
||||||
|
}
|
||||||
|
if c, ok := backend.(*net.TCPConn); ok {
|
||||||
|
c.SetNoDelay(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for {
|
||||||
|
buf := make([]byte, 128*1024)
|
||||||
|
nn, err := client.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
trace(hashID, "Read from client error %v", err)
|
||||||
|
r0 = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if nn == 0 {
|
||||||
|
trace(hashID, "Read from client EOF")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = backend.Write(buf[:nn])
|
||||||
|
if err != nil {
|
||||||
|
trace(hashID, "Write to RTMP backend error %v", err)
|
||||||
|
r0 = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
trace(hashID, "Copy %v bytes to RTMP backend", nn)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for {
|
||||||
|
buf := make([]byte, 128*1024)
|
||||||
|
nn, err := backend.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
trace(hashID, "Read from RTMP backend error %v", err)
|
||||||
|
r0 = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if nn == 0 {
|
||||||
|
trace(hashID, "Read from RTMP backend EOF")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = client.Write(buf[:nn])
|
||||||
|
if err != nil {
|
||||||
|
trace(hashID, "Write to client error %v", err)
|
||||||
|
r0 = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
trace(hashID, "Copy %v bytes to RTMP client", nn)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
defer client.Close()
|
||||||
|
defer backend.Close()
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
trace(hashID, "Context is done, close the connections")
|
||||||
|
}()
|
||||||
|
|
||||||
|
trace(hashID, "Start proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr())
|
||||||
|
wg.Wait()
|
||||||
|
trace(hashID, "Finish proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr())
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
func trace(id, msg string, a ...interface{}) {
|
||||||
|
fmt.Println(fmt.Sprintf("[%v][%v] %v",
|
||||||
|
time.Now().Format("2006-01-02 15:04:05.000"), id,
|
||||||
|
fmt.Sprintf(msg, a...),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildHashID() string {
|
||||||
|
randomData := make([]byte, 16)
|
||||||
|
if _, err := rand.Read(randomData); err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := sha256.Sum256(randomData)
|
||||||
|
return hex.EncodeToString(hash[:])[:6]
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ The changelog for SRS.
|
||||||
<a name="v6-changes"></a>
|
<a name="v6-changes"></a>
|
||||||
|
|
||||||
## SRS 6.0 Changelog
|
## SRS 6.0 Changelog
|
||||||
|
* v6.0, 2024-03-19, Merge [#3958](https://github.com/ossrs/srs/pull/3958): Add a TCP proxy for debugging. v6.0.117 (#3958)
|
||||||
* v6.0, 2024-03-20, Merge [#3964](https://github.com/ossrs/srs/pull/3964): WebRTC: Add support for A/V only WHEP/WHEP player. v6.0.116 (#3964)
|
* v6.0, 2024-03-20, Merge [#3964](https://github.com/ossrs/srs/pull/3964): WebRTC: Add support for A/V only WHEP/WHEP player. v6.0.116 (#3964)
|
||||||
* v6.0, 2024-03-19, Merge [#3990](https://github.com/ossrs/srs/pull/3990): System: Disable feature that obtains versions and check features status. v6.0.115 (#3990)
|
* v6.0, 2024-03-19, Merge [#3990](https://github.com/ossrs/srs/pull/3990): System: Disable feature that obtains versions and check features status. v6.0.115 (#3990)
|
||||||
* v6.0, 2024-03-18, Merge [#3973](https://github.com/ossrs/srs/pull/3973): Typo: Fix some typo for #3973 #3976 #3982. v6.0.114 (#3973)
|
* v6.0, 2024-03-18, Merge [#3973](https://github.com/ossrs/srs/pull/3973): Typo: Fix some typo for #3973 #3976 #3982. v6.0.114 (#3973)
|
||||||
|
|
|
@ -9,6 +9,6 @@
|
||||||
|
|
||||||
#define VERSION_MAJOR 6
|
#define VERSION_MAJOR 6
|
||||||
#define VERSION_MINOR 0
|
#define VERSION_MINOR 0
|
||||||
#define VERSION_REVISION 116
|
#define VERSION_REVISION 117
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue