1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Support redis load balancer.

This commit is contained in:
winlin 2024-08-29 17:19:37 +08:00
parent b8ded5a681
commit 3269228e64
8 changed files with 324 additions and 77 deletions

View file

@ -207,7 +207,9 @@ func (v *systemAPI) Run(ctx context.Context) error {
srs.SRT, srs.RTC = srt, rtc
srs.UpdatedAt = time.Now()
})
srsLoadBalancer.Update(server)
if err := srsLoadBalancer.Update(ctx, server); err != nil {
return errors.Wrapf(err, "update SRS server %+v", server)
}
logger.Df(ctx, "Register SRS media server, %+v", server)
return nil

View file

@ -49,6 +49,14 @@ func setupDefaultEnv(ctx context.Context) {
// The load balancer, use redis or memory.
setEnvDefault("PROXY_LOAD_BALANCER_TYPE", "redis")
// The redis server host.
setEnvDefault("PROXY_REDIS_HOST", "127.0.0.1")
// The redis server port.
setEnvDefault("PROXY_REDIS_PORT", "6379")
// The redis server password.
setEnvDefault("PROXY_REDIS_PASSWORD", "")
// The redis server db.
setEnvDefault("PROXY_REDIS_DB", "0")
// Whether enable the default backend server, for debugging.
setEnvDefault("PROXY_DEFAULT_BACKEND_ENABLED", "off")
@ -62,12 +70,81 @@ func setupDefaultEnv(ctx context.Context) {
"PROXY_HTTP_API=%v, PROXY_HTTP_SERVER=%v, PROXY_RTMP_SERVER=%v, "+
"PROXY_SYSTEM_API=%v, PROXY_DEFAULT_BACKEND_ENABLED=%v, "+
"PROXY_DEFAULT_BACKEND_IP=%v, PROXY_DEFAULT_BACKEND_RTMP=%v, "+
"PROXY_LOAD_BALANCER_TYPE=%v",
"PROXY_LOAD_BALANCER_TYPE=%v, PROXY_REDIS_HOST=%v, PROXY_REDIS_PORT=%v, "+
"PROXY_REDIS_PASSWORD=%v, PROXY_REDIS_DB=%v",
envGoPprof(),
envForceQuitTimeout(), envGraceQuitTimeout(),
envHttpAPI(), envHttpServer(), envRtmpServer(),
envSystemAPI(), envDefaultBackendEnabled(),
envDefaultBackendIP(), envDefaultBackendRTMP(),
envLoadBalancerType(),
envLoadBalancerType(), envRedisHost(), envRedisPort(),
envRedisPassword(), envRedisDB(),
)
}
func envRedisDB() string {
return os.Getenv("PROXY_REDIS_DB")
}
func envRedisPassword() string {
return os.Getenv("PROXY_REDIS_PASSWORD")
}
func envRedisPort() string {
return os.Getenv("PROXY_REDIS_PORT")
}
func envRedisHost() string {
return os.Getenv("PROXY_REDIS_HOST")
}
func envLoadBalancerType() string {
return os.Getenv("PROXY_LOAD_BALANCER_TYPE")
}
func envDefaultBackendRTMP() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_RTMP")
}
func envDefaultBackendIP() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_IP")
}
func envDefaultBackendEnabled() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_ENABLED")
}
func envGraceQuitTimeout() string {
return os.Getenv("PROXY_GRACE_QUIT_TIMEOUT")
}
func envForceQuitTimeout() string {
return os.Getenv("PROXY_FORCE_QUIT_TIMEOUT")
}
func envGoPprof() string {
return os.Getenv("GO_PPROF")
}
func envSystemAPI() string {
return os.Getenv("PROXY_SYSTEM_API")
}
func envRtmpServer() string {
return os.Getenv("PROXY_RTMP_SERVER")
}
func envHttpServer() string {
return os.Getenv("PROXY_HTTP_SERVER")
}
func envHttpAPI() string {
return os.Getenv("PROXY_HTTP_API")
}
// setEnvDefault set env key=value if not set.
func setEnvDefault(key, value string) {
if os.Getenv(key) == "" {
os.Setenv(key, value)
}
}

View file

@ -2,4 +2,9 @@ module srs-proxy
go 1.18
require github.com/joho/godotenv v1.5.1 // indirect
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/joho/godotenv v1.5.1 // indirect
)

View file

@ -1,2 +1,8 @@
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=

View file

@ -51,6 +51,8 @@ func doMain(ctx context.Context) error {
switch lbType := envLoadBalancerType(); lbType {
case "memory":
srsLoadBalancer = NewMemoryLoadBalancer()
case "redis":
srsLoadBalancer = NewRedisLoadBalancer()
default:
return errors.Errorf("invalid load balancer %v", lbType)
}

View file

@ -298,9 +298,14 @@ func (v *RTMPClient) Close() error {
}
func (v *RTMPClient) Connect(ctx context.Context, tcUrl, streamName string) error {
// Build the stream URL in vhost/app/stream schema.
streamURL, err := buildStreamURL(fmt.Sprintf("%v/%v", tcUrl, streamName))
if err != nil {
return errors.Wrapf(err, "build stream url %v/%v", tcUrl, streamName)
}
// Pick a backend SRS server to proxy the RTMP stream.
streamURL := fmt.Sprintf("%v/%v", tcUrl, streamName)
backend, err := srsLoadBalancer.Pick(streamURL)
backend, err := srsLoadBalancer.Pick(ctx, streamURL)
if err != nil {
return errors.Wrapf(err, "pick backend for %v", streamURL)
}

View file

@ -5,40 +5,48 @@ package main
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"srs-proxy/errors"
"srs-proxy/logger"
"strconv"
"strings"
"time"
// Use v8 because we use Go 1.16+, while v9 requires Go 1.18+
"github.com/go-redis/redis/v8"
"srs-proxy/errors"
"srs-proxy/logger"
"srs-proxy/sync"
)
// If server heartbeat in this duration, it's alive.
const srsServerAliveDuration = 300 * time.Second
type SRSServer struct {
// The server IP.
IP string
IP string `json:"ip,omitempty"`
// The server device ID, configured by user.
DeviceID string
DeviceID string `json:"device_id,omitempty"`
// The server id of SRS, store in file, may not change, mandatory.
ServerID string
ServerID string `json:"server_id,omitempty"`
// The service id of SRS, always change when restarted, mandatory.
ServiceID string
ServiceID string `json:"service_id,omitempty"`
// The process id of SRS, always change when restarted, mandatory.
PID string
PID string `json:"pid,omitempty"`
// The RTMP listen endpoints.
RTMP []string
RTMP []string `json:"rtmp,omitempty"`
// The HTTP Stream listen endpoints.
HTTP []string
HTTP []string `json:"http,omitempty"`
// The HTTP API listen endpoints.
API []string
API []string `json:"api,omitempty"`
// The SRT server listen endpoints.
SRT []string
SRT []string `json:"srt,omitempty"`
// The RTC server listen endpoints.
RTC []string
RTC []string `json:"rtc,omitempty"`
// Last update time.
UpdatedAt time.Time
UpdatedAt time.Time `json:"update_at,omitempty"`
}
func (v *SRSServer) ID() string {
@ -120,9 +128,9 @@ type SRSLoadBalancer interface {
// Initialize the load balancer.
Initialize(ctx context.Context) error
// Update the backer server.
Update(server *SRSServer)
Update(ctx context.Context, server *SRSServer) error
// Pick a backend server for the specified stream URL.
Pick(streamURL string) (*SRSServer, error)
Pick(ctx context.Context, streamURL string) (*SRSServer, error)
}
// srsLoadBalancer is the global SRS load balancer.
@ -144,17 +152,20 @@ func (v *srsMemoryLoadBalancer) Initialize(ctx context.Context) error {
if server, err := NewDefaultSRSForDebugging(); err != nil {
return errors.Wrapf(err, "initialize default SRS")
} else if server != nil {
v.Update(server)
if err := v.Update(ctx, server); err != nil {
return errors.Wrapf(err, "update default SRS %+v", server)
}
logger.Df(ctx, "Initialize default SRS media server, %+v", server)
}
return nil
}
func (v *srsMemoryLoadBalancer) Update(server *SRSServer) {
func (v *srsMemoryLoadBalancer) Update(ctx context.Context, server *SRSServer) error {
v.servers.Store(server.ID(), server)
return nil
}
func (v *srsMemoryLoadBalancer) Pick(streamURL string) (*SRSServer, error) {
func (v *srsMemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSServer, error) {
// Always proxy to the same server for the same stream URL.
if server, ok := v.picked.Load(streamURL); ok {
return server, nil
@ -163,7 +174,7 @@ func (v *srsMemoryLoadBalancer) Pick(streamURL string) (*SRSServer, error) {
// Gather all servers, alive in 60s ago.
var servers []*SRSServer
v.servers.Range(func(key string, server *SRSServer) bool {
if time.Since(server.UpdatedAt) < 60*time.Second {
if time.Since(server.UpdatedAt) < srsServerAliveDuration {
servers = append(servers, server)
}
return true
@ -187,3 +198,168 @@ func (v *srsMemoryLoadBalancer) Pick(streamURL string) (*SRSServer, error) {
v.picked.Store(streamURL, server)
return server, nil
}
type srsRedisLoadBalancer struct {
// The redis client sdk.
rdb *redis.Client
}
func NewRedisLoadBalancer() SRSLoadBalancer {
return &srsRedisLoadBalancer{}
}
func (v *srsRedisLoadBalancer) Initialize(ctx context.Context) error {
redisDatabase, err := strconv.Atoi(envRedisDB())
if err != nil {
return errors.Wrapf(err, "invalid PROXY_REDIS_DB %v", envRedisDB())
}
rdb := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%v:%v", envRedisHost(), envRedisPort()),
Password: envRedisPassword(),
DB: redisDatabase,
})
v.rdb = rdb
if err := rdb.Ping(ctx).Err(); err != nil {
return errors.Wrapf(err, "unable to connect to redis %v", rdb.String())
}
logger.Df(ctx, "connected to redis %v ok", rdb.String())
if server, err := NewDefaultSRSForDebugging(); err != nil {
return errors.Wrapf(err, "initialize default SRS")
} else if server != nil {
if err := v.Update(ctx, server); err != nil {
return errors.Wrapf(err, "update default SRS %+v", server)
}
// Keep alive.
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(30 * time.Second):
if err := v.Update(ctx, server); err != nil {
logger.Wf(ctx, "update default SRS %+v failed, %+v", server, err)
}
}
}
}()
logger.Df(ctx, "Initialize default SRS media server, %+v", server)
}
return nil
}
func (v *srsRedisLoadBalancer) Update(ctx context.Context, server *SRSServer) error {
b, err := json.Marshal(server)
if err != nil {
return errors.Wrapf(err, "marshal server %+v", server)
}
key := fmt.Sprintf("srs-proxy-server:%v", server.ID())
if err = v.rdb.Set(ctx, key, b, srsServerAliveDuration).Err(); err != nil {
return errors.Wrapf(err, "set key=%v server %+v", key, server)
}
// Query all servers from redis, in json string.
var serverKeys []string
if b, err := v.rdb.Get(ctx, v.redisKeyServers()).Bytes(); err == nil {
if err := json.Unmarshal(b, &serverKeys); err != nil {
return errors.Wrapf(err, "unmarshal key=%v servers %v", v.redisKeyServers(), string(b))
}
}
// Check each server expiration, if not exists in redis, remove from servers.
for i, serverKey := range serverKeys {
if _, err := v.rdb.Get(ctx, serverKey).Bytes(); err != nil {
serverKeys = append(serverKeys[:i], serverKeys[i+1:]...)
continue
}
}
// Add server to servers if not exists.
var found bool
for _, serverKey := range serverKeys {
if serverKey == key {
found = true
break
}
}
if !found {
serverKeys = append(serverKeys, key)
}
// Update all servers to redis.
b, err = json.Marshal(serverKeys)
if err != nil {
return errors.Wrapf(err, "marshal servers %+v", serverKeys)
}
if err = v.rdb.Set(ctx, v.redisKeyServers(), b, 0).Err(); err != nil {
return errors.Wrapf(err, "set key=%v servers %+v", v.redisKeyServers(), serverKeys)
}
return nil
}
func (v *srsRedisLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSServer, error) {
key := fmt.Sprintf("srs-proxy-url:%v", streamURL)
// Always proxy to the same server for the same stream URL.
if serverKey, err := v.rdb.Get(ctx, key).Result(); err == nil {
// If server not exists, ignore and pick another server for the stream URL.
if b, err := v.rdb.Get(ctx, serverKey).Bytes(); err == nil && len(b) > 0 {
var server SRSServer
if err := json.Unmarshal(b, &server); err != nil {
return nil, errors.Wrapf(err, "unmarshal key=%v server %v", key, string(b))
}
// TODO: If server fail, we should migrate the streams to another server.
return &server, nil
}
}
// Query all servers from redis, in json string.
var serverKeys []string
if b, err := v.rdb.Get(ctx, v.redisKeyServers()).Bytes(); err == nil {
if err := json.Unmarshal(b, &serverKeys); err != nil {
return nil, errors.Wrapf(err, "unmarshal key=%v servers %v", v.redisKeyServers(), string(b))
}
}
// No server found, failed.
if len(serverKeys) == 0 {
return nil, fmt.Errorf("no server available for %v", streamURL)
}
// All server should be alive, if not, should have been removed by redis. So we only
// random pick one that is always available.
var serverKey string
var server SRSServer
for i := 0; i < 3; i++ {
tryServerKey := serverKeys[rand.Intn(len(serverKeys))]
b, err := v.rdb.Get(ctx, tryServerKey).Bytes()
if err == nil && len(b) > 0 {
if err := json.Unmarshal(b, &server); err != nil {
return nil, errors.Wrapf(err, "unmarshal key=%v server %v", serverKey, string(b))
}
serverKey = tryServerKey
break
}
}
if serverKey == "" {
return nil, errors.Errorf("no server available in %v for %v", serverKeys, streamURL)
}
// Update the picked server for the stream URL.
if err := v.rdb.Set(ctx, key, []byte(serverKey), 0).Err(); err != nil {
return nil, errors.Wrapf(err, "set key=%v server %v", key, serverKey)
}
return &server, nil
}
func (v *srsRedisLoadBalancer) redisKeyServers() string {
return fmt.Sprintf("srs-proxy-servers-all")
}

View file

@ -9,66 +9,17 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"net/url"
"reflect"
"strings"
"time"
"srs-proxy/errors"
"srs-proxy/logger"
)
// setEnvDefault set env key=value if not set.
func setEnvDefault(key, value string) {
if os.Getenv(key) == "" {
os.Setenv(key, value)
}
}
func envHttpAPI() string {
return os.Getenv("PROXY_HTTP_API")
}
func envHttpServer() string {
return os.Getenv("PROXY_HTTP_SERVER")
}
func envRtmpServer() string {
return os.Getenv("PROXY_RTMP_SERVER")
}
func envSystemAPI() string {
return os.Getenv("PROXY_SYSTEM_API")
}
func envGoPprof() string {
return os.Getenv("GO_PPROF")
}
func envForceQuitTimeout() string {
return os.Getenv("PROXY_FORCE_QUIT_TIMEOUT")
}
func envGraceQuitTimeout() string {
return os.Getenv("PROXY_GRACE_QUIT_TIMEOUT")
}
func envDefaultBackendEnabled() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_ENABLED")
}
func envDefaultBackendIP() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_IP")
}
func envDefaultBackendRTMP() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_RTMP")
}
func envLoadBalancerType() string {
return os.Getenv("PROXY_LOAD_BALANCER_TYPE")
}
func apiResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, data any) {
w.Header().Set("Server", fmt.Sprintf("%v/%v", Signature(), Version()))
@ -116,3 +67,26 @@ func ParseBody(r io.ReadCloser, v interface{}) error {
return nil
}
// buildStreamURL build as vhost/app/stream for stream URL r.
func buildStreamURL(r string) (string, error) {
u, err := url.Parse(r)
if err != nil {
return "", errors.Wrapf(err, "parse url %v", r)
}
// If not domain or ip in hostname, it's __defaultVhost__.
defaultVhost := !strings.Contains(u.Hostname(), ".")
// If hostname is actually an IP address, it's __defaultVhost__.
if ip := net.ParseIP(u.Hostname()); ip.To4() != nil {
defaultVhost = true
}
if defaultVhost {
return fmt.Sprintf("__defaultVhost__%v", u.Path), nil
}
// Ignore port, only use hostname as vhost.
return fmt.Sprintf("%v%v", u.Hostname(), u.Path), nil
}