1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refine code to files.

This commit is contained in:
winlin 2024-08-26 17:34:14 +08:00
parent 2eae020a94
commit 650befdfa4
8 changed files with 194 additions and 92 deletions

20
proxy/debug.go Normal file
View file

@ -0,0 +1,20 @@
// Copyright (c) 2024 Winlin
//
// SPDX-License-Identifier: MIT
package main
import (
"context"
"net/http"
"srs-proxy/logger"
)
func handleGoPprof(ctx context.Context) {
if addr := envGoPprof(); addr != "" {
go func() {
logger.Df(ctx, "Start Go pprof at %v", addr)
http.ListenAndServe(addr, nil)
}()
}
}

53
proxy/env.go Normal file
View file

@ -0,0 +1,53 @@
// Copyright (c) 2024 Winlin
//
// SPDX-License-Identifier: MIT
package main
import (
"context"
"os"
"path"
"srs-proxy/errors"
"srs-proxy/logger"
"github.com/joho/godotenv"
)
// loadEnvFile loads the environment variables from file. Note that we only use .env file.
func loadEnvFile(ctx context.Context) error {
if workDir, err := os.Getwd(); err != nil {
return errors.Wrapf(err, "getpwd")
} else {
envFile := path.Join(workDir, ".env")
if _, err := os.Stat(envFile); err == nil {
if err := godotenv.Overload(envFile); err != nil {
return errors.Wrapf(err, "load %v", envFile)
}
}
}
return nil
}
func setupDefaultEnv(ctx context.Context) {
// Whether enable the Go pprof.
setEnvDefault("GO_PPROF", "")
// Force shutdown timeout.
setEnvDefault("PROXY_FORCE_QUIT_TIMEOUT", "30s")
// Graceful quit timeout.
setEnvDefault("PROXY_GRACE_QUIT_TIMEOUT", "20s")
// The HTTP API server.
setEnvDefault("PROXY_HTTP_API", "1985")
// The HTTP web server.
setEnvDefault("PROXY_HTTP_SERVER", "8080")
logger.Df(ctx, "load .env as GO_PPROF=%v, "+
"PROXY_FORCE_QUIT_TIMEOUT=%v, PROXY_GRACE_QUIT_TIMEOUT=%v, "+
"PROXY_HTTP_API=%v, PROXY_HTTP_SERVER=%v",
envGoPprof(),
envForceQuitTimeout(), envGraceQuitTimeout(),
envHttpAPI(), envHttpServer(),
)
}

View file

@ -11,16 +11,21 @@ import (
"srs-proxy/logger"
"strings"
"time"
"srs-proxy/errors"
)
type httpServer struct {
// The underlayer HTTP server.
server *http.Server
// The gracefully quit timeout, wait server to quit.
gracefulQuitTimeout time.Duration
}
func NewHttpServer() *httpServer {
return &httpServer{}
func NewHttpServer(opts ...func(*httpServer)) *httpServer {
v := &httpServer{}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *httpServer) Close() error {
@ -28,14 +33,6 @@ func (v *httpServer) Close() error {
}
func (v *httpServer) ListenAndServe(ctx context.Context) error {
// Parse the gracefully quit timeout.
var gracefulQuitTimeout time.Duration
if t, err := time.ParseDuration(envGraceQuitTimeout()); err != nil {
return errors.Wrapf(err, "parse duration %v", envGraceQuitTimeout())
} else {
gracefulQuitTimeout = t
}
// Parse address to listen.
addr := envHttpServer()
if !strings.Contains(addr, ":") {
@ -51,7 +48,7 @@ func (v *httpServer) ListenAndServe(ctx context.Context) error {
ctxParent := ctx
<-ctxParent.Done()
ctx, cancel := context.WithTimeout(context.Background(), gracefulQuitTimeout)
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)
@ -60,7 +57,7 @@ func (v *httpServer) ListenAndServe(ctx context.Context) error {
// The basic version handler, also can be used as health check API.
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
res := struct {
type Response struct {
Code int `json:"code"`
PID string `json:"pid"`
Data struct {
@ -69,10 +66,9 @@ func (v *httpServer) ListenAndServe(ctx context.Context) error {
Revision int `json:"revision"`
Version string `json:"version"`
} `json:"data"`
}{}
}
res.Code = 0
res.PID = fmt.Sprintf("%v", os.Getpid())
res := Response{Code: 0, PID: fmt.Sprintf("%v", os.Getpid())}
res.Data.Major = VersionMajor()
res.Data.Minor = VersionMinor()
res.Data.Revision = VersionRevision()

View file

@ -19,8 +19,12 @@ type loggerPlus struct {
level string
}
func newLoggerPlus(l *stdLog.Logger, level string) *loggerPlus {
return &loggerPlus{logger: l, level: level}
func newLoggerPlus(opts ...func(*loggerPlus)) *loggerPlus {
v := &loggerPlus{}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *loggerPlus) Printf(ctx context.Context, f string, a ...interface{}) {
@ -64,8 +68,20 @@ const (
)
func init() {
verboseLogger = newLoggerPlus(stdLog.New(ioutil.Discard, "", stdLog.Ldate|stdLog.Ltime|stdLog.Lmicroseconds), logVerboseLabel)
debugLogger = newLoggerPlus(stdLog.New(os.Stdout, "", stdLog.Ldate|stdLog.Ltime|stdLog.Lmicroseconds), logDebugLabel)
warnLogger = newLoggerPlus(stdLog.New(os.Stderr, "", stdLog.Ldate|stdLog.Ltime|stdLog.Lmicroseconds), logWarnLabel)
errorLogger = newLoggerPlus(stdLog.New(os.Stderr, "", stdLog.Ldate|stdLog.Ltime|stdLog.Lmicroseconds), logErrorLabel)
verboseLogger = newLoggerPlus(func(logger *loggerPlus) {
logger.logger = stdLog.New(ioutil.Discard, "", stdLog.Ldate|stdLog.Ltime|stdLog.Lmicroseconds)
logger.level = logVerboseLabel
})
debugLogger = newLoggerPlus(func(logger *loggerPlus) {
logger.logger = stdLog.New(os.Stdout, "", stdLog.Ldate|stdLog.Ltime|stdLog.Lmicroseconds)
logger.level = logDebugLabel
})
warnLogger = newLoggerPlus(func(logger *loggerPlus) {
logger.logger = stdLog.New(os.Stderr, "", stdLog.Ldate|stdLog.Ltime|stdLog.Lmicroseconds)
logger.level = logWarnLabel
})
errorLogger = newLoggerPlus(func(logger *loggerPlus) {
logger.logger = stdLog.New(os.Stderr, "", stdLog.Ldate|stdLog.Ltime|stdLog.Lmicroseconds)
logger.level = logErrorLabel
})
}

View file

@ -5,104 +5,57 @@ package main
import (
"context"
"net/http"
"os"
"os/signal"
"path"
"syscall"
"time"
"srs-proxy/errors"
"srs-proxy/logger"
"github.com/joho/godotenv"
)
func main() {
ctx := logger.WithContext(context.Background())
logger.Df(ctx, "SRS %v/%v started", Signature(), Version())
logger.Df(ctx, "%v/%v started", Signature(), Version())
// Install signals.
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt)
ctx, cancel := context.WithCancel(ctx)
go func() {
for s := range sc {
logger.Df(ctx, "Got signal %v", s)
cancel()
}
}()
installSignals(ctx, cancel)
// Start the main loop, ignore the user cancel error.
err := doMain(ctx)
if err != nil && ctx.Err() == context.Canceled {
if err != nil && ctx.Err() != context.Canceled {
logger.Ef(ctx, "main: %v", err)
os.Exit(-1)
}
logger.Df(ctx, "Server %v done", Signature())
logger.Df(ctx, "%v done", Signature())
}
func doMain(ctx context.Context) error {
// Load the environment variables from file. Note that we only use .env file.
if workDir, err := os.Getwd(); err != nil {
return errors.Wrapf(err, "getpwd")
} else {
envFile := path.Join(workDir, ".env")
if _, err := os.Stat(envFile); err == nil {
if err := godotenv.Overload(envFile); err != nil {
return errors.Wrapf(err, "load %v", envFile)
}
}
// Setup the environment variables.
if err := loadEnvFile(ctx); err != nil {
return errors.Wrapf(err, "load env")
}
// Whether enable the Go pprof.
setEnvDefault("GO_PPROF", "")
// Force shutdown timeout.
setEnvDefault("PROXY_FORCE_QUIT_TIMEOUT", "30s")
// Graceful quit timeout.
setEnvDefault("PROXY_GRACE_QUIT_TIMEOUT", "20s")
// The HTTP API server.
setEnvDefault("PROXY_HTTP_API", "1985")
// The HTTP web server.
setEnvDefault("PROXY_HTTP_SERVER", "8080")
logger.Df(ctx, "load .env as GO_PPROF=%v, "+
"PROXY_FORCE_QUIT_TIMEOUT=%v, PROXY_GRACE_QUIT_TIMEOUT=%v, "+
"PROXY_HTTP_API=%v, PROXY_HTTP_SERVER=%v",
envGoPprof(),
envForceQuitTimeout(), envGraceQuitTimeout(),
envHttpAPI(), envHttpServer(),
)
setupDefaultEnv(ctx)
// When cancelled, the program is forced to exit due to a timeout. Normally, this doesn't occur
// because the main thread exits after the context is cancelled. However, sometimes the main thread
// may be blocked for some reason, so a forced exit is necessary to ensure the program terminates.
var forceTimeout time.Duration
if t, err := time.ParseDuration(envForceQuitTimeout()); err != nil {
return errors.Wrapf(err, "parse force timeout %v", envForceQuitTimeout())
} else {
forceTimeout = t
if err := installForceQuit(ctx); err != nil {
return errors.Wrapf(err, "install force quit")
}
go func() {
<-ctx.Done()
time.Sleep(forceTimeout)
logger.Wf(ctx, "Force to exit by timeout")
os.Exit(1)
}()
// Start the Go pprof if enabled.
if addr := envGoPprof(); addr != "" {
go func() {
logger.Df(ctx, "Start Go pprof at %v", addr)
http.ListenAndServe(addr, nil)
}()
handleGoPprof(ctx)
// Parse the gracefully quit timeout.
gracefulQuitTimeout, err := parseGracefullyQuitTimeout()
if err != nil {
return errors.Wrapf(err, "parse gracefully quit timeout")
}
// Start the HTTP web server.
httpServer := NewHttpServer()
httpServer := NewHttpServer(func(server *httpServer) {
server.gracefulQuitTimeout = gracefulQuitTimeout
})
defer httpServer.Close()
if err := httpServer.ListenAndServe(ctx); err != nil {
return errors.Wrapf(err, "http server")

44
proxy/signal.go Normal file
View file

@ -0,0 +1,44 @@
// Copyright (c) 2024 Winlin
//
// SPDX-License-Identifier: MIT
package main
import (
"context"
"os"
"os/signal"
"srs-proxy/errors"
"syscall"
"time"
"srs-proxy/logger"
)
func installSignals(ctx context.Context, cancel context.CancelFunc) {
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt)
go func() {
for s := range sc {
logger.Df(ctx, "Got signal %v", s)
cancel()
}
}()
}
func installForceQuit(ctx context.Context) error {
var forceTimeout time.Duration
if t, err := time.ParseDuration(envForceQuitTimeout()); err != nil {
return errors.Wrapf(err, "parse force timeout %v", envForceQuitTimeout())
} else {
forceTimeout = t
}
go func() {
<-ctx.Done()
time.Sleep(forceTimeout)
logger.Wf(ctx, "Force to exit by timeout")
os.Exit(1)
}()
return nil
}

View file

@ -6,9 +6,13 @@ package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"reflect"
"srs-proxy/errors"
"srs-proxy/logger"
"time"
)
// setEnvDefault set env key=value if not set.
@ -39,9 +43,16 @@ func envGraceQuitTimeout() string {
}
func apiResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, data any) {
w.Header().Set("Server", fmt.Sprintf("%v/%v", Signature(), Version()))
b, err := json.Marshal(data)
if err != nil {
logger.Wf(ctx, "marshal %v err %v", data, err)
msg := fmt.Sprintf("marshal %v %v err %v", reflect.TypeOf(data), data, err)
logger.Wf(ctx, msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintln(w, msg)
return
}
@ -49,3 +60,11 @@ func apiResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, da
w.WriteHeader(http.StatusOK)
w.Write(b)
}
func parseGracefullyQuitTimeout() (time.Duration, error) {
if t, err := time.ParseDuration(envGraceQuitTimeout()); err != nil {
return 0, errors.Wrapf(err, "parse duration %v", envGraceQuitTimeout())
} else {
return t, nil
}
}

View file

@ -9,8 +9,9 @@ func VersionMajor() int {
return 1
}
// VersionMinor specifies the typical version of SRS we adapt to.
func VersionMinor() int {
return 0
return 5
}
func VersionRevision() int {
@ -22,5 +23,5 @@ func Version() string {
}
func Signature() string {
return "GoProxy"
return "SRSProxy"
}