diff --git a/README.md b/README.md index 4e4b50a36..b6c38d027 100755 --- a/README.md +++ b/README.md @@ -289,6 +289,7 @@ See also: [Performance Test Guide](https://github.com/winlinvip/simple-rtmp-serv * nginx v1.5.0: 139524 lines
### History +* v1.0, 2014-01-11, fix jw/flower player pause bug, which send closeStream actually. * v1.0, 2014-01-05, add wiki [Build](https://github.com/winlinvip/simple-rtmp-server/wiki/Build), [Performance](https://github.com/winlinvip/simple-rtmp-server/wiki/Performance), [Cluster](https://github.com/winlinvip/simple-rtmp-server/wiki/Cluster) * v1.0, 2014-01-01, change listen(512), chunk-size(60000), to improve performance. * v1.0, 2013-12-27, merge from wenjie, the bandwidth test feature. diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index 3d5dc6c4d..d6730d888 100644 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -168,6 +168,41 @@ int SrsClient::service_cycle() } srs_verbose("on_bw_done success"); + while (true) { + ret = stream_service_cycle(); + + // stream service must terminated with error, never success. + srs_assert(ret != ERROR_SUCCESS); + + // when not system control error, fatal error, return. + if (!srs_is_system_control_error(ret)) { + srs_error("stream service cycle failed. ret=%d", ret); + return ret; + } + + // for "some" system control error, + // logical accept and retry stream service. + if (ret == ERROR_CONTROL_RTMP_CLOSE) { + // set timeout to a larger value, for user paused. + rtmp->set_recv_timeout(SRS_PAUSED_SEND_TIMEOUT_US); + rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US); + + srs_trace("control message(close) accept, retry stream service."); + continue; + } + + // for other system control message, fatal error. + srs_error("control message(%d) reject as error. ret=%d", ret, ret); + return ret; + } + + return ret; +} + +int SrsClient::stream_service_cycle() +{ + int ret = ERROR_SUCCESS; + SrsClientType type; if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) { srs_error("identify client failed. ret=%d", ret); @@ -175,7 +210,12 @@ int SrsClient::service_cycle() } req->strip(); srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str()); + + // client is identified, set the timeout to service timeout. + rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US); + rtmp->set_send_timeout(SRS_SEND_TIMEOUT_US); + // set timeout to larger. int chunk_size = config->get_chunk_size(req->vhost); if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) { srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret); @@ -341,7 +381,9 @@ int SrsClient::playing(SrsSource* source) return ret; } if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { - srs_error("process play control message failed. ret=%d", ret); + if (!srs_is_system_control_error(ret)) { + srs_error("process play control message failed. ret=%d", ret); + } return ret; } } @@ -555,6 +597,13 @@ int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* } srs_info("decode the amf0/amf3 command packet success."); + SrsCloseStreamPacket* close = dynamic_cast(msg->get_packet()); + if (close) { + ret = ERROR_CONTROL_RTMP_CLOSE; + srs_trace("system control message: rtmp close stream. ret=%d", ret); + return ret; + } + SrsPausePacket* pause = dynamic_cast(msg->get_packet()); if (!pause) { srs_info("ignore all amf0/amf3 command except pause."); @@ -700,4 +749,3 @@ void SrsClient::on_stop() } #endif } - diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index 3d93f4123..fe1d1475e 100644 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -71,6 +71,8 @@ public: private: // when valid and connected to vhost/app, service the client. virtual int service_cycle(); + // stream(play/publish) service cycle, identify client first. + virtual int stream_service_cycle(); virtual int check_vhost(); virtual int playing(SrsSource* source); virtual int publish(SrsSource* source, bool is_fmle); @@ -86,4 +88,4 @@ private: virtual void on_stop(); }; -#endif \ No newline at end of file +#endif diff --git a/trunk/src/core/srs_core_error.cpp b/trunk/src/core/srs_core_error.cpp index 3163d0036..226a61027 100644 --- a/trunk/src/core/srs_core_error.cpp +++ b/trunk/src/core/srs_core_error.cpp @@ -22,3 +22,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include + +bool srs_is_system_control_error(int error_code) +{ + return error_code == ERROR_CONTROL_RTMP_CLOSE; +} + diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index 587491579..e08564776 100644 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -147,4 +147,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_HTTP_DATA_INVLIAD 801 #define ERROR_HTTP_PARSE_HEADER 802 -#endif \ No newline at end of file +// system control message, +// not an error, but special control logic. +// sys ctl: rtmp close stream, support replay. +#define ERROR_CONTROL_RTMP_CLOSE 900 + +/** +* whether the error code is an system control error. +*/ +extern bool srs_is_system_control_error(int error_code); + +#endif diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 92e150065..d8a032352 100644 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -196,6 +196,7 @@ messages. */ #define RTMP_AMF0_COMMAND_CONNECT "connect" #define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream" +#define RTMP_AMF0_COMMAND_CLOSE_STREAM "closeStream" #define RTMP_AMF0_COMMAND_PLAY "play" #define RTMP_AMF0_COMMAND_PAUSE "pause" #define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone" @@ -1363,6 +1364,10 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol) srs_info("decode the AMF0/AMF3 band width check message."); packet = new SrsBandwidthPacket(); return packet->decode(stream); + } else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) { + srs_info("decode the AMF0/AMF3 closeStream message."); + packet = new SrsCloseStreamPacket(); + return packet->decode(stream); } // default packet to drop message. @@ -2064,6 +2069,41 @@ int SrsCreateStreamResPacket::encode_packet(SrsStream* stream) return ret; } +SrsCloseStreamPacket::SrsCloseStreamPacket() +{ + command_name = RTMP_AMF0_COMMAND_CLOSE_STREAM; + transaction_id = 0; + command_object = new SrsAmf0Null(); +} + +SrsCloseStreamPacket::~SrsCloseStreamPacket() +{ + srs_freep(command_object); +} + +int SrsCloseStreamPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode closeStream command_name failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode closeStream transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode closeStream command_object failed. ret=%d", ret); + return ret; + } + srs_info("amf0 decode closeStream packet success"); + + return ret; +} + SrsFMLEStartPacket::SrsFMLEStartPacket() { command_name = RTMP_AMF0_COMMAND_CREATE_STREAM; @@ -3325,4 +3365,3 @@ int SrsUserControlPacket::encode_packet(SrsStream* stream) return ret; } - diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index ca6602c67..68b77c7f1 100644 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -56,6 +56,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // if timeout, close the connection. #define SRS_RECV_TIMEOUT_US 30*1000*1000L +// the timeout to wait client data, when client paused +// if timeout, close the connection. +#define SRS_PAUSED_SEND_TIMEOUT_US 30*60*1000*1000L + +// the timeout to send data to client, when client paused +// if timeout, close the connection. +#define SRS_PAUSED_RECV_TIMEOUT_US 30*60*1000*1000L + // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, // sleep a while and close the connection. @@ -625,6 +633,28 @@ protected: virtual int get_size(); virtual int encode_packet(SrsStream* stream); }; +/** +* client close stream packet. +*/ +class SrsCloseStreamPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsCloseStreamPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* command_object; +public: + SrsCloseStreamPacket(); + virtual ~SrsCloseStreamPacket(); +public: + virtual int decode(SrsStream* stream); +}; /** * FMLE start publish: ReleaseStream/PublishStream @@ -1223,4 +1253,4 @@ int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** return ret; } -#endif \ No newline at end of file +#endif diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index bf67fb9fc..26e6cd30b 100644 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -171,7 +171,7 @@ void SrsRequest::strip() trim(stream, "/ \n\r\t"); } -std::string& SrsRequest::trim(string& str, string chs) +string& SrsRequest::trim(string& str, string chs) { for (int i = 0; i < (int)chs.length(); i++) { char ch = chs.at(i); @@ -695,7 +695,7 @@ int SrsRtmp::on_bw_done() return ret; } -int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& stream_name) +int SrsRtmp::identify_client(int stream_id, SrsClientType& type, string& stream_name) { type = SrsClientUnknown; int ret = ERROR_SUCCESS; @@ -723,13 +723,15 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { srs_info("identify client by create stream, play or flash publish."); - return identify_create_stream_client( - dynamic_cast(pkt), stream_id, type, stream_name); + return identify_create_stream_client(dynamic_cast(pkt), stream_id, type, stream_name); } if (dynamic_cast(pkt)) { srs_info("identify client by releaseStream, fmle publish."); - return identify_fmle_publish_client( - dynamic_cast(pkt), type, stream_name); + return identify_fmle_publish_client(dynamic_cast(pkt), type, stream_name); + } + if (dynamic_cast(pkt)) { + srs_info("level0 identify client by play."); + return identify_play_client(dynamic_cast(pkt), type, stream_name); } srs_trace("ignore AMF0/AMF3 command message."); @@ -1165,16 +1167,12 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { - SrsPlayPacket* play = dynamic_cast(pkt); - type = SrsClientPlay; - stream_name = play->stream_name; - srs_trace("identity client type=play, stream_name=%s", stream_name.c_str()); - return ret; + srs_info("level1 identify client by play."); + return identify_play_client(dynamic_cast(pkt), type, stream_name); } if (dynamic_cast(pkt)) { srs_info("identify client by publish, falsh publish."); - return identify_flash_publish_client( - dynamic_cast(pkt), type, stream_name); + return identify_flash_publish_client(dynamic_cast(pkt), type, stream_name); } srs_trace("ignore AMF0/AMF3 command message."); @@ -1216,3 +1214,16 @@ int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& return ret; } + +int SrsRtmp::identify_play_client(SrsPlayPacket* req, SrsClientType& type, string& stream_name) +{ + int ret = ERROR_SUCCESS; + + type = SrsClientPlay; + stream_name = req->stream_name; + + srs_trace("identity client type=play, stream_name=%s", stream_name.c_str()); + + return ret; +} + diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 54b0948af..bd39e550a 100644 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -40,6 +40,7 @@ class SrsFMLEStartPacket; class SrsPublishPacket; class SrsSharedPtrMessage; class SrsOnMetaDataPacket; +class SrsPlayPacket; /** * the original request from client. @@ -226,6 +227,8 @@ private: virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name); virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name); +private: + virtual int identify_play_client(SrsPlayPacket* req, SrsClientType& type, std::string& stream_name); }; -#endif \ No newline at end of file +#endif