diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 5faf13b05..f35bdac86 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -893,20 +893,23 @@ vhost refer.anti_suck.com { pithy_print { # shared print interval for all publish clients, in milliseconds. # if not specified, set to 1100. - publish 2000; + publish 10000; # shared print interval for all play clients, in milliseconds. # if not specified, set to 1300. - play 3000; + play 10000; # shared print interval for all forwarders, in milliseconds. # if not specified, set to 2000. - forwarder 3000; + forwarder 10000; # shared print interval for all encoders, in milliseconds. # if not specified, set to 2000. - encoder 3000; + encoder 10000; # shared print interval for all ingesters, in milliseconds. # if not specified, set to 2000. - ingester 3000; + ingester 10000; # shared print interval for all hls, in milliseconds. # if not specified, set to 2000. - hls 3000; + hls 10000; + # shared print interval for all edge, in milliseconds. + # if not specified, set to 2000. + edge 10000; } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 236df8984..6929c8cd5 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1396,6 +1396,21 @@ int SrsConfig::get_pithy_print_play() return ::atoi(pithy->arg0().c_str()); } +int SrsConfig::get_pithy_print_edge() +{ + SrsConfDirective* pithy = root->get("pithy_print"); + if (!pithy) { + return SRS_STAGE_EDGE_INTERVAL_MS; + } + + pithy = pithy->get("edge"); + if (!pithy) { + return SRS_STAGE_EDGE_INTERVAL_MS; + } + + return ::atoi(pithy->arg0().c_str()); +} + SrsConfDirective* SrsConfig::get_vhost(string vhost) { srs_assert(root); @@ -1821,6 +1836,17 @@ bool SrsConfig::get_vhost_is_edge(std::string vhost) return true; } +SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return NULL; + } + + return conf->get("origin"); +} + SrsConfDirective* SrsConfig::get_transcode(string vhost, string scope) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 70ec317f6..0c13cdb36 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -76,6 +76,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_STAGE_ENCODER_INTERVAL_MS 2000 #define SRS_STAGE_INGESTER_INTERVAL_MS 2000 #define SRS_STAGE_HLS_INTERVAL_MS 2000 +#define SRS_STAGE_EDGE_INTERVAL_MS 2000 #define SRS_AUTO_INGEST_TYPE_FILE "file" #define SRS_AUTO_INGEST_TYPE_STREAM "stream" @@ -163,6 +164,7 @@ public: virtual int get_pithy_print_ingester(); virtual int get_pithy_print_hls(); virtual int get_pithy_print_play(); + virtual int get_pithy_print_edge(); // vhost specified section public: virtual SrsConfDirective* get_vhost(std::string vhost); @@ -193,6 +195,7 @@ public: // vhost edge section public: virtual bool get_vhost_is_edge(std::string vhost); + virtual SrsConfDirective* get_vhost_edge_origin(std::string vhost); // vhost transcode section public: virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope); diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 726449a7f..521bb2d11 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -23,28 +23,55 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include +#include +#include +#include + #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // when error, edge ingester sleep for a while and retry. -#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) +#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) + +// when edge timeout, retry next. +#define SRS_EDGE_TIMEOUT_US (int64_t)(3*1000*1000LL) SrsEdgeIngester::SrsEdgeIngester() { + io = NULL; + client = NULL; _edge = NULL; _req = NULL; + origin_index = 0; + stream_id = 0; + stfd = NULL; pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US); } SrsEdgeIngester::~SrsEdgeIngester() { + stop(); + + srs_freep(pthread); } -int SrsEdgeIngester::initialize(SrsEdge* edge, SrsRequest* req) +int SrsEdgeIngester::initialize(SrsSource* source, SrsEdge* edge, SrsRequest* req) { int ret = ERROR_SUCCESS; + _source = source; _edge = edge; _req = req; @@ -53,14 +80,229 @@ int SrsEdgeIngester::initialize(SrsEdge* edge, SrsRequest* req) int SrsEdgeIngester::start() { - int ret = ERROR_SUCCESS; - return ret; - //return pthread->start(); + return pthread->start(); +} + +void SrsEdgeIngester::stop() +{ + pthread->stop(); + + close_underlayer_socket(); + + srs_freep(client); + srs_freep(io); } int SrsEdgeIngester::cycle() { int ret = ERROR_SUCCESS; + + if ((ret = connect_server()) != ERROR_SUCCESS) { + return ret; + } + srs_assert(client); + + client->set_recv_timeout(SRS_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_SEND_TIMEOUT_US); + + SrsRequest* req = _req; + + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->connect_app(req->app, req->tcUrl)) != ERROR_SUCCESS) { + srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret); + return ret; + } + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + return ret; + } + + if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d", + req->stream.c_str(), stream_id, ret); + return ret; + } + + if ((ret = _source->on_publish()) != ERROR_SUCCESS) { + srs_error("edge ingester play stream then publish to edge failed. ret=%d", ret); + return ret; + } + + if ((ret = _edge->on_ingest_play()) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = ingest()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsEdgeIngester::ingest() +{ + int ret = ERROR_SUCCESS; + + client->set_recv_timeout(SRS_EDGE_TIMEOUT_US); + + SrsPithyPrint pithy_print(SRS_STAGE_EDGE); + + while (pthread->can_loop()) { + // switch to other st-threads. + st_usleep(0); + + pithy_print.elapse(); + + // pithy print + if (pithy_print.can_print()) { + srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); + } + + // read from client. + SrsCommonMessage* msg = NULL; + if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv origin server message failed. ret=%d", ret); + return ret; + } + srs_verbose("edge loop recv message. ret=%d", ret); + + srs_assert(msg); + SrsAutoFree(SrsCommonMessage, msg, false); + + if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; +} + +int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) +{ + int ret = ERROR_SUCCESS; + + SrsSource* source = _source; + + // process audio packet + if (msg->header.is_audio()) { + if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) { + srs_error("source process audio message failed. ret=%d", ret); + return ret; + } + } + + // process video packet + if (msg->header.is_video()) { + if ((ret = source->on_video(msg)) != ERROR_SUCCESS) { + srs_error("source process video message failed. ret=%d", ret); + return ret; + } + } + + // process onMetaData + if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { + if ((ret = msg->decode_packet(client->get_protocol())) != ERROR_SUCCESS) { + srs_error("decode onMetaData message failed. ret=%d", ret); + return ret; + } + + SrsPacket* pkt = msg->get_packet(); + if (dynamic_cast(pkt)) { + SrsOnMetaDataPacket* metadata = dynamic_cast(pkt); + if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) { + srs_error("source process onMetaData message failed. ret=%d", ret); + return ret; + } + srs_trace("process onMetaData message success."); + return ret; + } + + srs_trace("ignore AMF0/AMF3 data message."); + return ret; + } + + return ret; +} + +void SrsEdgeIngester::close_underlayer_socket() +{ + srs_close_stfd(stfd); +} + +int SrsEdgeIngester::connect_server() +{ + int ret = ERROR_SUCCESS; + + // reopen + close_underlayer_socket(); + + // TODO: FIXME: support reload + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost); + srs_assert(conf); + + // select the origin. + std::string server = conf->args.at(origin_index % conf->args.size()); + origin_index = (origin_index + 1) % conf->args.size(); + + std::string s_port = RTMP_DEFAULT_PORT; + int port = ::atoi(RTMP_DEFAULT_PORT); + size_t pos = server.find(":"); + if (pos != std::string::npos) { + s_port = server.substr(pos + 1); + server = server.substr(0, pos); + port = ::atoi(s_port.c_str()); + } + + // open socket. + srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d", + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port); + + // TODO: FIXME: extract utility method + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + srs_assert(!stfd); + stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + srs_freep(client); + srs_freep(io); + + io = new SrsSocket(stfd); + client = new SrsRtmpClient(io); + + // connect to server. + std::string ip = srs_dns_resolve(server); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); + return ret; + } + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); + return ret; } @@ -75,11 +317,11 @@ SrsEdge::~SrsEdge() srs_freep(ingester); } -int SrsEdge::initialize(SrsRequest* req) +int SrsEdge::initialize(SrsSource* source, SrsRequest* req) { int ret = ERROR_SUCCESS; - if ((ret = ingester->initialize(this, req)) != ERROR_SUCCESS) { + if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) { return ret; } @@ -106,3 +348,25 @@ int SrsEdge::on_client_play() return ret; } +void SrsEdge::on_all_client_stop() +{ + if (state == SrsEdgeStateIngestConnected) { + ingester->stop(); + } + + SrsEdgeState pstate = state; + state = SrsEdgeStateInit; + srs_trace("edge change from %d to state %d (init).", pstate, state); +} + +int SrsEdge::on_ingest_play() +{ + int ret = ERROR_SUCCESS; + + SrsEdgeState pstate = state; + state = SrsEdgeStateIngestConnected; + + srs_trace("edge change from %d to state %d (ingest connected).", pstate, state); + + return ret; +} diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 130c0ef56..92bc5b55f 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -30,10 +30,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include #include class SrsEdge; +class SrsSource; class SrsRequest; +class SrsRtmpClient; +class SrsCommonMessage; +class ISrsProtocolReaderWriter; /** * the state of edge @@ -43,7 +48,10 @@ enum SrsEdgeState SrsEdgeStateInit = 0, SrsEdgeStatePlay = 100, SrsEdgeStatePublish, - SrsEdgeStateConnected, + // play stream from origin, ingest stream + SrsEdgeStateIngestConnected, + // publish stream to edge, forward to origin + SrsEdgeStateForwardConnected, SrsEdgeStateAborting, SrsEdgeStateReloading, }; @@ -54,18 +62,31 @@ enum SrsEdgeState class SrsEdgeIngester : public ISrsThreadHandler { private: + int stream_id; +private: + SrsSource* _source; SrsEdge* _edge; SrsRequest* _req; SrsThread* pthread; + st_netfd_t stfd; + ISrsProtocolReaderWriter* io; + SrsRtmpClient* client; + int origin_index; public: SrsEdgeIngester(); virtual ~SrsEdgeIngester(); public: - virtual int initialize(SrsEdge* edge, SrsRequest* req); + virtual int initialize(SrsSource* source, SrsEdge* edge, SrsRequest* req); virtual int start(); + virtual void stop(); // interface ISrsThreadHandler public: virtual int cycle(); +private: + virtual int ingest(); + virtual void close_underlayer_socket(); + virtual int connect_server(); + virtual int process_publish_message(SrsCommonMessage* msg); }; /** @@ -80,11 +101,20 @@ public: SrsEdge(); virtual ~SrsEdge(); public: - virtual int initialize(SrsRequest* req); + virtual int initialize(SrsSource* source, SrsRequest* req); /** * when client play stream on edge. */ virtual int on_client_play(); + /** + * when all client stopped play, disconnect to origin. + */ + virtual void on_all_client_stop(); +public: + /** + * when ingester start to play stream. + */ + virtual int on_ingest_play(); }; #endif diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index 08f3abaa1..3d4c19d9b 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -113,7 +113,7 @@ int SrsEncoder::cycle() // pithy print encoder(); - pithy_print->elapse(SRS_RTMP_ENCODER_SLEEP_US / 1000); + pithy_print->elapse(); return ret; } @@ -326,7 +326,7 @@ void SrsEncoder::encoder() if (pithy_print->can_print()) { // TODO: FIXME: show more info. srs_trace("-> time=%"PRId64", encoders=%d, input=%s", - pithy_print->get_age(), (int)ffmpegs.size(), input_stream_name.c_str()); + pithy_print->age(), (int)ffmpegs.size(), input_stream_name.c_str()); } } diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 52d7b8cc1..8e20bc6fe 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -83,6 +83,7 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) std::string s_port = RTMP_DEFAULT_PORT; port = ::atoi(RTMP_DEFAULT_PORT); + // TODO: FIXME: parse complex params size_t pos = forward_server.find(":"); if (pos != std::string::npos) { s_port = forward_server.substr(pos + 1); @@ -310,6 +311,8 @@ int SrsForwarder::forward() // switch to other st-threads. st_usleep(0); + pithy_print.elapse(); + // read from client. if (true) { SrsCommonMessage* msg = NULL; @@ -330,19 +333,18 @@ int SrsForwarder::forward() return ret; } + // pithy print + if (pithy_print.can_print()) { + srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); + } + // ignore when no messages. if (count <= 0) { srs_verbose("no packets to forward."); continue; } SrsAutoFree(SrsSharedPtrMessage*, msgs, true); - - // pithy print - pithy_print.elapse(SRS_PULSE_TIMEOUT_US / 1000); - if (pithy_print.can_print()) { - srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); - } // all msgs to forward. for (int i = 0; i < count; i++) { diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 822b99ba7..2980462be 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -1470,10 +1470,10 @@ void SrsHls::hls_mux() { // reportable if (pithy_print->can_print()) { - srs_trace("-> time=%"PRId64"", pithy_print->get_age()); + srs_trace("-> time=%"PRId64"", pithy_print->age()); } - pithy_print->elapse(sample->cts); + pithy_print->elapse(); } #endif diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index a336a6222..3d43ddff8 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -188,7 +188,7 @@ int SrsIngester::cycle() // pithy print ingester(); - pithy_print->elapse(SRS_AUTO_INGESTER_SLEEP_US / 1000); + pithy_print->elapse(); return ret; } @@ -350,7 +350,7 @@ void SrsIngester::ingester() } // TODO: FIXME: show more info. - srs_trace("-> time=%"PRId64", ingesters=%d", pithy_print->get_age(), (int)ingesters.size()); + srs_trace("-> time=%"PRId64", ingesters=%d", pithy_print->age(), (int)ingesters.size()); } int SrsIngester::on_reload_vhost_added(string vhost) diff --git a/trunk/src/app/srs_app_pithy_print.cpp b/trunk/src/app/srs_app_pithy_print.cpp index cca72c687..10dcaff1d 100644 --- a/trunk/src/app/srs_app_pithy_print.cpp +++ b/trunk/src/app/srs_app_pithy_print.cpp @@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include #define SRS_STAGE_DEFAULT_INTERVAL_MS 1200 @@ -75,6 +76,10 @@ struct SrsStageInfo : public ISrsReloadHandler pithy_print_time_ms = _srs_config->get_pithy_print_ingester(); break; } + case SRS_STAGE_EDGE: { + pithy_print_time_ms = _srs_config->get_pithy_print_edge(); + break; + } case SRS_STAGE_HLS: { pithy_print_time_ms = _srs_config->get_pithy_print_hls(); break; @@ -98,7 +103,8 @@ SrsPithyPrint::SrsPithyPrint(int _stage_id) { stage_id = _stage_id; client_id = enter_stage(); - printed_age = age = 0; + previous_tick = srs_get_system_time_ms(); + printed_age = _age = 0; } SrsPithyPrint::~SrsPithyPrint() @@ -138,9 +144,12 @@ void SrsPithyPrint::leave_stage() stage->stage_id, client_id, stage->nb_clients, stage->pithy_print_time_ms); } -void SrsPithyPrint::elapse(int64_t time_ms) +void SrsPithyPrint::elapse() { - age += time_ms; + int64_t diff = srs_get_system_time_ms() - previous_tick; + + _age += srs_max(0, diff); + previous_tick = srs_get_system_time_ms(); } bool SrsPithyPrint::can_print() @@ -148,24 +157,19 @@ bool SrsPithyPrint::can_print() SrsStageInfo* stage = _srs_stages[stage_id]; srs_assert(stage != NULL); - int64_t alive_age = age - printed_age; + int64_t alive_age = _age - printed_age; int64_t can_print_age = stage->nb_clients * stage->pithy_print_time_ms; bool can_print = alive_age >= can_print_age; if (can_print) { - printed_age = age; + printed_age = _age; } return can_print; } -int64_t SrsPithyPrint::get_age() +int64_t SrsPithyPrint::age() { - return age; -} - -void SrsPithyPrint::set_age(int64_t _age) -{ - age = _age; + return _age; } diff --git a/trunk/src/app/srs_app_pithy_print.hpp b/trunk/src/app/srs_app_pithy_print.hpp index df1052c43..4661cc315 100644 --- a/trunk/src/app/srs_app_pithy_print.hpp +++ b/trunk/src/app/srs_app_pithy_print.hpp @@ -42,6 +42,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_STAGE_HLS 5 // the pithy stage for all ingesters. #define SRS_STAGE_INGESTER 6 +// the pithy stage for all edge. +#define SRS_STAGE_EDGE 7 /** * the stage is used for a collection of object to do print, @@ -55,8 +57,9 @@ private: int client_id; int stage_id; // in ms. - int64_t age; + int64_t _age; int64_t printed_age; + int64_t previous_tick; public: /** * @param _stage_id defined in SRS_STAGE_xxx, eg. SRS_STAGE_PLAY_USER. @@ -74,9 +77,9 @@ private: virtual void leave_stage(); public: /** - * specified client elapse some time. + * auto calc the elapse time */ - virtual void elapse(int64_t time_ms); + virtual void elapse(); /** * whether current client can print. */ @@ -84,8 +87,7 @@ public: /** * get the elapsed time in ms. */ - virtual int64_t get_age(); - virtual void set_age(int64_t _age); + virtual int64_t age(); }; #endif \ No newline at end of file diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 79349ce59..6f6b80c50 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -297,8 +297,8 @@ int SrsRtmpConn::stream_service_cycle() srs_verbose("start to play stream %s.", req->stream.c_str()); if (vhost_is_edge) { - if ((ret = source->on_edge_play_stream()) != ERROR_SUCCESS) { - srs_error("notice edge play stream failed. ret=%d", ret); + if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) { + srs_error("notice edge start play stream failed. ret=%d", ret); return ret; } } @@ -311,9 +311,11 @@ int SrsRtmpConn::stream_service_cycle() srs_error("http hook on_play failed. ret=%d", ret); return ret; } + srs_info("start to play stream %s success", req->stream.c_str()); ret = playing(source); on_stop(); + return ret; } case SrsRtmpConnFMLEPublish: { @@ -423,10 +425,10 @@ int SrsRtmpConn::playing(SrsSource* source) int64_t starttime = -1; while (true) { - pithy_print.elapse(SRS_PULSE_TIMEOUT_US / 1000); - // switch to other st-threads. st_usleep(0); + + pithy_print.elapse(); // read from client. int ctl_msg_ret = ERROR_SUCCESS; @@ -460,7 +462,7 @@ int SrsRtmpConn::playing(SrsSource* source) // reportable if (pithy_print.can_print()) { srs_trace("-> time=%"PRId64", duration=%"PRId64", cmr=%d, msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.get_age(), duration, ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); + pithy_print.age(), duration, ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); } if (count <= 0) { @@ -531,14 +533,15 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) return ret; } + srs_assert(msg); SrsAutoFree(SrsCommonMessage, msg, false); - pithy_print.set_age(msg->header.timestamp); + pithy_print.elapse(); // reportable if (pithy_print.can_print()) { srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); + pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); } // process UnPublish event. @@ -604,12 +607,12 @@ int SrsRtmpConn::flash_publish(SrsSource* source) SrsAutoFree(SrsCommonMessage, msg, false); - pithy_print.set_age(msg->header.timestamp); + pithy_print.elapse(); // reportable if (pithy_print.can_print()) { srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); + pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); } // process UnPublish event. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 668779fee..50c2941a5 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -515,7 +515,7 @@ int SrsSource::initialize() } #endif - if ((ret = edge->initialize(_req)) != ERROR_SUCCESS) { + if ((ret = edge->initialize(this, _req)) != ERROR_SUCCESS) { return ret; } @@ -1168,6 +1168,10 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer) consumers.erase(it); } srs_info("handle consumer destroy success."); + + if (consumers.empty()) { + edge->on_all_client_stop(); + } } void SrsSource::set_cache(bool enabled) @@ -1180,7 +1184,7 @@ bool SrsSource::is_atc() return atc; } -int SrsSource::on_edge_play_stream() +int SrsSource::on_edge_start_play() { return edge->on_client_play(); } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 56769ae40..fb83df04f 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -313,7 +313,7 @@ public: // for consumer, atc feature. virtual bool is_atc(); // for edge, when play edge stream, check the state - virtual int on_edge_play_stream(); + virtual int on_edge_start_play(); private: virtual int create_forwarders(); virtual void destroy_forwarders(); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 206b03cde..2824e58f3 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "76" +#define VERSION_REVISION "77" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "srs" diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index 1b743c820..982cd4df6 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -337,6 +337,11 @@ SrsRtmpClient::~SrsRtmpClient() srs_freep(hs_bytes); } +SrsProtocol* SrsRtmpClient::get_protocol() +{ + return protocol; +} + void SrsRtmpClient::set_recv_timeout(int64_t timeout_us) { protocol->set_recv_timeout(timeout_us); diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index 70b5425a2..d0079fdda 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -155,6 +155,7 @@ public: SrsRtmpClient(ISrsProtocolReaderWriter* skt); virtual ~SrsRtmpClient(); public: + virtual SrsProtocol* get_protocol(); virtual void set_recv_timeout(int64_t timeout_us); virtual void set_send_timeout(int64_t timeout_us); virtual int64_t get_recv_bytes();