1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

edge support play and ingest origin stream. change to 0.9.77

This commit is contained in:
winlin 2014-04-26 21:41:18 +08:00
parent acba4cfdc6
commit 2bcaeccc51
17 changed files with 406 additions and 59 deletions

View file

@ -893,20 +893,23 @@ vhost refer.anti_suck.com {
pithy_print { pithy_print {
# shared print interval for all publish clients, in milliseconds. # shared print interval for all publish clients, in milliseconds.
# if not specified, set to 1100. # if not specified, set to 1100.
publish 2000; publish 10000;
# shared print interval for all play clients, in milliseconds. # shared print interval for all play clients, in milliseconds.
# if not specified, set to 1300. # if not specified, set to 1300.
play 3000; play 10000;
# shared print interval for all forwarders, in milliseconds. # shared print interval for all forwarders, in milliseconds.
# if not specified, set to 2000. # if not specified, set to 2000.
forwarder 3000; forwarder 10000;
# shared print interval for all encoders, in milliseconds. # shared print interval for all encoders, in milliseconds.
# if not specified, set to 2000. # if not specified, set to 2000.
encoder 3000; encoder 10000;
# shared print interval for all ingesters, in milliseconds. # shared print interval for all ingesters, in milliseconds.
# if not specified, set to 2000. # if not specified, set to 2000.
ingester 3000; ingester 10000;
# shared print interval for all hls, in milliseconds. # shared print interval for all hls, in milliseconds.
# if not specified, set to 2000. # if not specified, set to 2000.
hls 3000; hls 10000;
# shared print interval for all edge, in milliseconds.
# if not specified, set to 2000.
edge 10000;
} }

View file

@ -1396,6 +1396,21 @@ int SrsConfig::get_pithy_print_play()
return ::atoi(pithy->arg0().c_str()); return ::atoi(pithy->arg0().c_str());
} }
int SrsConfig::get_pithy_print_edge()
{
SrsConfDirective* pithy = root->get("pithy_print");
if (!pithy) {
return SRS_STAGE_EDGE_INTERVAL_MS;
}
pithy = pithy->get("edge");
if (!pithy) {
return SRS_STAGE_EDGE_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
SrsConfDirective* SrsConfig::get_vhost(string vhost) SrsConfDirective* SrsConfig::get_vhost(string vhost)
{ {
srs_assert(root); srs_assert(root);
@ -1821,6 +1836,17 @@ bool SrsConfig::get_vhost_is_edge(std::string vhost)
return true; return true;
} }
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
return conf->get("origin");
}
SrsConfDirective* SrsConfig::get_transcode(string vhost, string scope) SrsConfDirective* SrsConfig::get_transcode(string vhost, string scope)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);

View file

@ -76,6 +76,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_STAGE_ENCODER_INTERVAL_MS 2000 #define SRS_STAGE_ENCODER_INTERVAL_MS 2000
#define SRS_STAGE_INGESTER_INTERVAL_MS 2000 #define SRS_STAGE_INGESTER_INTERVAL_MS 2000
#define SRS_STAGE_HLS_INTERVAL_MS 2000 #define SRS_STAGE_HLS_INTERVAL_MS 2000
#define SRS_STAGE_EDGE_INTERVAL_MS 2000
#define SRS_AUTO_INGEST_TYPE_FILE "file" #define SRS_AUTO_INGEST_TYPE_FILE "file"
#define SRS_AUTO_INGEST_TYPE_STREAM "stream" #define SRS_AUTO_INGEST_TYPE_STREAM "stream"
@ -163,6 +164,7 @@ public:
virtual int get_pithy_print_ingester(); virtual int get_pithy_print_ingester();
virtual int get_pithy_print_hls(); virtual int get_pithy_print_hls();
virtual int get_pithy_print_play(); virtual int get_pithy_print_play();
virtual int get_pithy_print_edge();
// vhost specified section // vhost specified section
public: public:
virtual SrsConfDirective* get_vhost(std::string vhost); virtual SrsConfDirective* get_vhost(std::string vhost);
@ -193,6 +195,7 @@ public:
// vhost edge section // vhost edge section
public: public:
virtual bool get_vhost_is_edge(std::string vhost); virtual bool get_vhost_is_edge(std::string vhost);
virtual SrsConfDirective* get_vhost_edge_origin(std::string vhost);
// vhost transcode section // vhost transcode section
public: public:
virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope); virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope);

View file

@ -23,28 +23,55 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_edge.hpp> #include <srs_app_edge.hpp>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_protocol_rtmp.hpp> #include <srs_protocol_rtmp.hpp>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_protocol_rtmp.hpp>
#include <srs_protocol_io.hpp>
#include <srs_app_config.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_rtmp.hpp>
#include <srs_app_socket.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_app_source.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_core_autofree.hpp>
// when error, edge ingester sleep for a while and retry. // when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
// when edge timeout, retry next.
#define SRS_EDGE_TIMEOUT_US (int64_t)(3*1000*1000LL)
SrsEdgeIngester::SrsEdgeIngester() SrsEdgeIngester::SrsEdgeIngester()
{ {
io = NULL;
client = NULL;
_edge = NULL; _edge = NULL;
_req = NULL; _req = NULL;
origin_index = 0;
stream_id = 0;
stfd = NULL;
pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US); pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US);
} }
SrsEdgeIngester::~SrsEdgeIngester() SrsEdgeIngester::~SrsEdgeIngester()
{ {
stop();
srs_freep(pthread);
} }
int SrsEdgeIngester::initialize(SrsEdge* edge, SrsRequest* req) int SrsEdgeIngester::initialize(SrsSource* source, SrsEdge* edge, SrsRequest* req)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
_source = source;
_edge = edge; _edge = edge;
_req = req; _req = req;
@ -53,14 +80,229 @@ int SrsEdgeIngester::initialize(SrsEdge* edge, SrsRequest* req)
int SrsEdgeIngester::start() int SrsEdgeIngester::start()
{ {
int ret = ERROR_SUCCESS; return pthread->start();
return ret; }
//return pthread->start();
void SrsEdgeIngester::stop()
{
pthread->stop();
close_underlayer_socket();
srs_freep(client);
srs_freep(io);
} }
int SrsEdgeIngester::cycle() int SrsEdgeIngester::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if ((ret = connect_server()) != ERROR_SUCCESS) {
return ret;
}
srs_assert(client);
client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
client->set_send_timeout(SRS_SEND_TIMEOUT_US);
SrsRequest* req = _req;
if ((ret = client->handshake()) != ERROR_SUCCESS) {
srs_error("handshake with server failed. ret=%d", ret);
return ret;
}
if ((ret = client->connect_app(req->app, req->tcUrl)) != ERROR_SUCCESS) {
srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret);
return ret;
}
if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
return ret;
}
if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d",
req->stream.c_str(), stream_id, ret);
return ret;
}
if ((ret = _source->on_publish()) != ERROR_SUCCESS) {
srs_error("edge ingester play stream then publish to edge failed. ret=%d", ret);
return ret;
}
if ((ret = _edge->on_ingest_play()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = ingest()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsEdgeIngester::ingest()
{
int ret = ERROR_SUCCESS;
client->set_recv_timeout(SRS_EDGE_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_STAGE_EDGE);
while (pthread->can_loop()) {
// switch to other st-threads.
st_usleep(0);
pithy_print.elapse();
// pithy print
if (pithy_print.can_print()) {
srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
}
// read from client.
SrsCommonMessage* msg = NULL;
if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv origin server message failed. ret=%d", ret);
return ret;
}
srs_verbose("edge loop recv message. ret=%d", ret);
srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg, false);
if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) {
return ret;
}
}
return ret;
}
int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
SrsSource* source = _source;
// process audio packet
if (msg->header.is_audio()) {
if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("source process audio message failed. ret=%d", ret);
return ret;
}
}
// process video packet
if (msg->header.is_video()) {
if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
srs_error("source process video message failed. ret=%d", ret);
return ret;
}
}
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
if ((ret = msg->decode_packet(client->get_protocol())) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
}
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
srs_error("source process onMetaData message failed. ret=%d", ret);
return ret;
}
srs_trace("process onMetaData message success.");
return ret;
}
srs_trace("ignore AMF0/AMF3 data message.");
return ret;
}
return ret;
}
void SrsEdgeIngester::close_underlayer_socket()
{
srs_close_stfd(stfd);
}
int SrsEdgeIngester::connect_server()
{
int ret = ERROR_SUCCESS;
// reopen
close_underlayer_socket();
// TODO: FIXME: support reload
SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);
srs_assert(conf);
// select the origin.
std::string server = conf->args.at(origin_index % conf->args.size());
origin_index = (origin_index + 1) % conf->args.size();
std::string s_port = RTMP_DEFAULT_PORT;
int port = ::atoi(RTMP_DEFAULT_PORT);
size_t pos = server.find(":");
if (pos != std::string::npos) {
s_port = server.substr(pos + 1);
server = server.substr(0, pos);
port = ::atoi(s_port.c_str());
}
// open socket.
srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d",
_req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);
// TODO: FIXME: extract utility method
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
ret = ERROR_SOCKET_CREATE;
srs_error("create socket error. ret=%d", ret);
return ret;
}
srs_assert(!stfd);
stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
srs_freep(client);
srs_freep(io);
io = new SrsSocket(stfd);
client = new SrsRtmpClient(io);
// connect to server.
std::string ip = srs_dns_resolve(server);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
return ret;
}
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
ret = ERROR_ST_CONNECT;
srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
return ret; return ret;
} }
@ -75,11 +317,11 @@ SrsEdge::~SrsEdge()
srs_freep(ingester); srs_freep(ingester);
} }
int SrsEdge::initialize(SrsRequest* req) int SrsEdge::initialize(SrsSource* source, SrsRequest* req)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if ((ret = ingester->initialize(this, req)) != ERROR_SUCCESS) { if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -106,3 +348,25 @@ int SrsEdge::on_client_play()
return ret; return ret;
} }
void SrsEdge::on_all_client_stop()
{
if (state == SrsEdgeStateIngestConnected) {
ingester->stop();
}
SrsEdgeState pstate = state;
state = SrsEdgeStateInit;
srs_trace("edge change from %d to state %d (init).", pstate, state);
}
int SrsEdge::on_ingest_play()
{
int ret = ERROR_SUCCESS;
SrsEdgeState pstate = state;
state = SrsEdgeStateIngestConnected;
srs_trace("edge change from %d to state %d (ingest connected).", pstate, state);
return ret;
}

View file

@ -30,10 +30,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp> #include <srs_core.hpp>
#include <srs_app_st.hpp>
#include <srs_app_thread.hpp> #include <srs_app_thread.hpp>
class SrsEdge; class SrsEdge;
class SrsSource;
class SrsRequest; class SrsRequest;
class SrsRtmpClient;
class SrsCommonMessage;
class ISrsProtocolReaderWriter;
/** /**
* the state of edge * the state of edge
@ -43,7 +48,10 @@ enum SrsEdgeState
SrsEdgeStateInit = 0, SrsEdgeStateInit = 0,
SrsEdgeStatePlay = 100, SrsEdgeStatePlay = 100,
SrsEdgeStatePublish, SrsEdgeStatePublish,
SrsEdgeStateConnected, // play stream from origin, ingest stream
SrsEdgeStateIngestConnected,
// publish stream to edge, forward to origin
SrsEdgeStateForwardConnected,
SrsEdgeStateAborting, SrsEdgeStateAborting,
SrsEdgeStateReloading, SrsEdgeStateReloading,
}; };
@ -54,18 +62,31 @@ enum SrsEdgeState
class SrsEdgeIngester : public ISrsThreadHandler class SrsEdgeIngester : public ISrsThreadHandler
{ {
private: private:
int stream_id;
private:
SrsSource* _source;
SrsEdge* _edge; SrsEdge* _edge;
SrsRequest* _req; SrsRequest* _req;
SrsThread* pthread; SrsThread* pthread;
st_netfd_t stfd;
ISrsProtocolReaderWriter* io;
SrsRtmpClient* client;
int origin_index;
public: public:
SrsEdgeIngester(); SrsEdgeIngester();
virtual ~SrsEdgeIngester(); virtual ~SrsEdgeIngester();
public: public:
virtual int initialize(SrsEdge* edge, SrsRequest* req); virtual int initialize(SrsSource* source, SrsEdge* edge, SrsRequest* req);
virtual int start(); virtual int start();
virtual void stop();
// interface ISrsThreadHandler // interface ISrsThreadHandler
public: public:
virtual int cycle(); virtual int cycle();
private:
virtual int ingest();
virtual void close_underlayer_socket();
virtual int connect_server();
virtual int process_publish_message(SrsCommonMessage* msg);
}; };
/** /**
@ -80,11 +101,20 @@ public:
SrsEdge(); SrsEdge();
virtual ~SrsEdge(); virtual ~SrsEdge();
public: public:
virtual int initialize(SrsRequest* req); virtual int initialize(SrsSource* source, SrsRequest* req);
/** /**
* when client play stream on edge. * when client play stream on edge.
*/ */
virtual int on_client_play(); virtual int on_client_play();
/**
* when all client stopped play, disconnect to origin.
*/
virtual void on_all_client_stop();
public:
/**
* when ingester start to play stream.
*/
virtual int on_ingest_play();
}; };
#endif #endif

View file

@ -113,7 +113,7 @@ int SrsEncoder::cycle()
// pithy print // pithy print
encoder(); encoder();
pithy_print->elapse(SRS_RTMP_ENCODER_SLEEP_US / 1000); pithy_print->elapse();
return ret; return ret;
} }
@ -326,7 +326,7 @@ void SrsEncoder::encoder()
if (pithy_print->can_print()) { if (pithy_print->can_print()) {
// TODO: FIXME: show more info. // TODO: FIXME: show more info.
srs_trace("-> time=%"PRId64", encoders=%d, input=%s", srs_trace("-> time=%"PRId64", encoders=%d, input=%s",
pithy_print->get_age(), (int)ffmpegs.size(), input_stream_name.c_str()); pithy_print->age(), (int)ffmpegs.size(), input_stream_name.c_str());
} }
} }

View file

@ -83,6 +83,7 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
std::string s_port = RTMP_DEFAULT_PORT; std::string s_port = RTMP_DEFAULT_PORT;
port = ::atoi(RTMP_DEFAULT_PORT); port = ::atoi(RTMP_DEFAULT_PORT);
// TODO: FIXME: parse complex params
size_t pos = forward_server.find(":"); size_t pos = forward_server.find(":");
if (pos != std::string::npos) { if (pos != std::string::npos) {
s_port = forward_server.substr(pos + 1); s_port = forward_server.substr(pos + 1);
@ -310,6 +311,8 @@ int SrsForwarder::forward()
// switch to other st-threads. // switch to other st-threads.
st_usleep(0); st_usleep(0);
pithy_print.elapse();
// read from client. // read from client.
if (true) { if (true) {
SrsCommonMessage* msg = NULL; SrsCommonMessage* msg = NULL;
@ -330,6 +333,12 @@ int SrsForwarder::forward()
return ret; return ret;
} }
// pithy print
if (pithy_print.can_print()) {
srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
}
// ignore when no messages. // ignore when no messages.
if (count <= 0) { if (count <= 0) {
srs_verbose("no packets to forward."); srs_verbose("no packets to forward.");
@ -337,13 +346,6 @@ int SrsForwarder::forward()
} }
SrsAutoFree(SrsSharedPtrMessage*, msgs, true); SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
// pithy print
pithy_print.elapse(SRS_PULSE_TIMEOUT_US / 1000);
if (pithy_print.can_print()) {
srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
}
// all msgs to forward. // all msgs to forward.
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i]; SrsSharedPtrMessage* msg = msgs[i];

View file

@ -1470,10 +1470,10 @@ void SrsHls::hls_mux()
{ {
// reportable // reportable
if (pithy_print->can_print()) { if (pithy_print->can_print()) {
srs_trace("-> time=%"PRId64"", pithy_print->get_age()); srs_trace("-> time=%"PRId64"", pithy_print->age());
} }
pithy_print->elapse(sample->cts); pithy_print->elapse();
} }
#endif #endif

View file

@ -188,7 +188,7 @@ int SrsIngester::cycle()
// pithy print // pithy print
ingester(); ingester();
pithy_print->elapse(SRS_AUTO_INGESTER_SLEEP_US / 1000); pithy_print->elapse();
return ret; return ret;
} }
@ -350,7 +350,7 @@ void SrsIngester::ingester()
} }
// TODO: FIXME: show more info. // TODO: FIXME: show more info.
srs_trace("-> time=%"PRId64", ingesters=%d", pithy_print->get_age(), (int)ingesters.size()); srs_trace("-> time=%"PRId64", ingesters=%d", pithy_print->age(), (int)ingesters.size());
} }
int SrsIngester::on_reload_vhost_added(string vhost) int SrsIngester::on_reload_vhost_added(string vhost)

View file

@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_config.hpp> #include <srs_app_config.hpp>
#include <srs_app_reload.hpp> #include <srs_app_reload.hpp>
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#define SRS_STAGE_DEFAULT_INTERVAL_MS 1200 #define SRS_STAGE_DEFAULT_INTERVAL_MS 1200
@ -75,6 +76,10 @@ struct SrsStageInfo : public ISrsReloadHandler
pithy_print_time_ms = _srs_config->get_pithy_print_ingester(); pithy_print_time_ms = _srs_config->get_pithy_print_ingester();
break; break;
} }
case SRS_STAGE_EDGE: {
pithy_print_time_ms = _srs_config->get_pithy_print_edge();
break;
}
case SRS_STAGE_HLS: { case SRS_STAGE_HLS: {
pithy_print_time_ms = _srs_config->get_pithy_print_hls(); pithy_print_time_ms = _srs_config->get_pithy_print_hls();
break; break;
@ -98,7 +103,8 @@ SrsPithyPrint::SrsPithyPrint(int _stage_id)
{ {
stage_id = _stage_id; stage_id = _stage_id;
client_id = enter_stage(); client_id = enter_stage();
printed_age = age = 0; previous_tick = srs_get_system_time_ms();
printed_age = _age = 0;
} }
SrsPithyPrint::~SrsPithyPrint() SrsPithyPrint::~SrsPithyPrint()
@ -138,9 +144,12 @@ void SrsPithyPrint::leave_stage()
stage->stage_id, client_id, stage->nb_clients, stage->pithy_print_time_ms); stage->stage_id, client_id, stage->nb_clients, stage->pithy_print_time_ms);
} }
void SrsPithyPrint::elapse(int64_t time_ms) void SrsPithyPrint::elapse()
{ {
age += time_ms; int64_t diff = srs_get_system_time_ms() - previous_tick;
_age += srs_max(0, diff);
previous_tick = srs_get_system_time_ms();
} }
bool SrsPithyPrint::can_print() bool SrsPithyPrint::can_print()
@ -148,24 +157,19 @@ bool SrsPithyPrint::can_print()
SrsStageInfo* stage = _srs_stages[stage_id]; SrsStageInfo* stage = _srs_stages[stage_id];
srs_assert(stage != NULL); srs_assert(stage != NULL);
int64_t alive_age = age - printed_age; int64_t alive_age = _age - printed_age;
int64_t can_print_age = stage->nb_clients * stage->pithy_print_time_ms; int64_t can_print_age = stage->nb_clients * stage->pithy_print_time_ms;
bool can_print = alive_age >= can_print_age; bool can_print = alive_age >= can_print_age;
if (can_print) { if (can_print) {
printed_age = age; printed_age = _age;
} }
return can_print; return can_print;
} }
int64_t SrsPithyPrint::get_age() int64_t SrsPithyPrint::age()
{ {
return age; return _age;
}
void SrsPithyPrint::set_age(int64_t _age)
{
age = _age;
} }

View file

@ -42,6 +42,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_STAGE_HLS 5 #define SRS_STAGE_HLS 5
// the pithy stage for all ingesters. // the pithy stage for all ingesters.
#define SRS_STAGE_INGESTER 6 #define SRS_STAGE_INGESTER 6
// the pithy stage for all edge.
#define SRS_STAGE_EDGE 7
/** /**
* the stage is used for a collection of object to do print, * the stage is used for a collection of object to do print,
@ -55,8 +57,9 @@ private:
int client_id; int client_id;
int stage_id; int stage_id;
// in ms. // in ms.
int64_t age; int64_t _age;
int64_t printed_age; int64_t printed_age;
int64_t previous_tick;
public: public:
/** /**
* @param _stage_id defined in SRS_STAGE_xxx, eg. SRS_STAGE_PLAY_USER. * @param _stage_id defined in SRS_STAGE_xxx, eg. SRS_STAGE_PLAY_USER.
@ -74,9 +77,9 @@ private:
virtual void leave_stage(); virtual void leave_stage();
public: public:
/** /**
* specified client elapse some time. * auto calc the elapse time
*/ */
virtual void elapse(int64_t time_ms); virtual void elapse();
/** /**
* whether current client can print. * whether current client can print.
*/ */
@ -84,8 +87,7 @@ public:
/** /**
* get the elapsed time in ms. * get the elapsed time in ms.
*/ */
virtual int64_t get_age(); virtual int64_t age();
virtual void set_age(int64_t _age);
}; };
#endif #endif

View file

@ -297,8 +297,8 @@ int SrsRtmpConn::stream_service_cycle()
srs_verbose("start to play stream %s.", req->stream.c_str()); srs_verbose("start to play stream %s.", req->stream.c_str());
if (vhost_is_edge) { if (vhost_is_edge) {
if ((ret = source->on_edge_play_stream()) != ERROR_SUCCESS) { if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) {
srs_error("notice edge play stream failed. ret=%d", ret); srs_error("notice edge start play stream failed. ret=%d", ret);
return ret; return ret;
} }
} }
@ -311,9 +311,11 @@ int SrsRtmpConn::stream_service_cycle()
srs_error("http hook on_play failed. ret=%d", ret); srs_error("http hook on_play failed. ret=%d", ret);
return ret; return ret;
} }
srs_info("start to play stream %s success", req->stream.c_str()); srs_info("start to play stream %s success", req->stream.c_str());
ret = playing(source); ret = playing(source);
on_stop(); on_stop();
return ret; return ret;
} }
case SrsRtmpConnFMLEPublish: { case SrsRtmpConnFMLEPublish: {
@ -423,11 +425,11 @@ int SrsRtmpConn::playing(SrsSource* source)
int64_t starttime = -1; int64_t starttime = -1;
while (true) { while (true) {
pithy_print.elapse(SRS_PULSE_TIMEOUT_US / 1000);
// switch to other st-threads. // switch to other st-threads.
st_usleep(0); st_usleep(0);
pithy_print.elapse();
// read from client. // read from client.
int ctl_msg_ret = ERROR_SUCCESS; int ctl_msg_ret = ERROR_SUCCESS;
if (true) { if (true) {
@ -460,7 +462,7 @@ int SrsRtmpConn::playing(SrsSource* source)
// reportable // reportable
if (pithy_print.can_print()) { if (pithy_print.can_print()) {
srs_trace("-> time=%"PRId64", duration=%"PRId64", cmr=%d, msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", srs_trace("-> time=%"PRId64", duration=%"PRId64", cmr=%d, msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.get_age(), duration, ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); pithy_print.age(), duration, ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
} }
if (count <= 0) { if (count <= 0) {
@ -531,14 +533,15 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
return ret; return ret;
} }
srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg, false); SrsAutoFree(SrsCommonMessage, msg, false);
pithy_print.set_age(msg->header.timestamp); pithy_print.elapse();
// reportable // reportable
if (pithy_print.can_print()) { if (pithy_print.can_print()) {
srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
} }
// process UnPublish event. // process UnPublish event.
@ -604,12 +607,12 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
SrsAutoFree(SrsCommonMessage, msg, false); SrsAutoFree(SrsCommonMessage, msg, false);
pithy_print.set_age(msg->header.timestamp); pithy_print.elapse();
// reportable // reportable
if (pithy_print.can_print()) { if (pithy_print.can_print()) {
srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
} }
// process UnPublish event. // process UnPublish event.

View file

@ -515,7 +515,7 @@ int SrsSource::initialize()
} }
#endif #endif
if ((ret = edge->initialize(_req)) != ERROR_SUCCESS) { if ((ret = edge->initialize(this, _req)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -1168,6 +1168,10 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
consumers.erase(it); consumers.erase(it);
} }
srs_info("handle consumer destroy success."); srs_info("handle consumer destroy success.");
if (consumers.empty()) {
edge->on_all_client_stop();
}
} }
void SrsSource::set_cache(bool enabled) void SrsSource::set_cache(bool enabled)
@ -1180,7 +1184,7 @@ bool SrsSource::is_atc()
return atc; return atc;
} }
int SrsSource::on_edge_play_stream() int SrsSource::on_edge_start_play()
{ {
return edge->on_client_play(); return edge->on_client_play();
} }

View file

@ -313,7 +313,7 @@ public:
// for consumer, atc feature. // for consumer, atc feature.
virtual bool is_atc(); virtual bool is_atc();
// for edge, when play edge stream, check the state // for edge, when play edge stream, check the state
virtual int on_edge_play_stream(); virtual int on_edge_start_play();
private: private:
virtual int create_forwarders(); virtual int create_forwarders();
virtual void destroy_forwarders(); virtual void destroy_forwarders();

View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version // current release version
#define VERSION_MAJOR "0" #define VERSION_MAJOR "0"
#define VERSION_MINOR "9" #define VERSION_MINOR "9"
#define VERSION_REVISION "76" #define VERSION_REVISION "77"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info. // server info.
#define RTMP_SIG_SRS_KEY "srs" #define RTMP_SIG_SRS_KEY "srs"

View file

@ -337,6 +337,11 @@ SrsRtmpClient::~SrsRtmpClient()
srs_freep(hs_bytes); srs_freep(hs_bytes);
} }
SrsProtocol* SrsRtmpClient::get_protocol()
{
return protocol;
}
void SrsRtmpClient::set_recv_timeout(int64_t timeout_us) void SrsRtmpClient::set_recv_timeout(int64_t timeout_us)
{ {
protocol->set_recv_timeout(timeout_us); protocol->set_recv_timeout(timeout_us);

View file

@ -155,6 +155,7 @@ public:
SrsRtmpClient(ISrsProtocolReaderWriter* skt); SrsRtmpClient(ISrsProtocolReaderWriter* skt);
virtual ~SrsRtmpClient(); virtual ~SrsRtmpClient();
public: public:
virtual SrsProtocol* get_protocol();
virtual void set_recv_timeout(int64_t timeout_us); virtual void set_recv_timeout(int64_t timeout_us);
virtual void set_send_timeout(int64_t timeout_us); virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_recv_bytes(); virtual int64_t get_recv_bytes();