diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index a676e8d5a..57ee6d08a 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -28,15 +28,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include - -// default stream id for response the createStream request. -#define SRS_DEFAULT_SID 1 +#include +#include SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) { ip = NULL; req = new SrsRequest(); + res = new SrsResponse(); rtmp = new SrsRtmp(client_stfd); } @@ -44,6 +44,7 @@ SrsClient::~SrsClient() { srs_freepa(ip); srs_freep(req); + srs_freep(res); srs_freep(rtmp); } @@ -97,14 +98,13 @@ int SrsClient::do_cycle() } srs_verbose("on_bw_done success"); - int stream_id = SRS_DEFAULT_SID; SrsClientType type; - if ((ret = rtmp->identify_client(stream_id, type, req->stream)) != ERROR_SUCCESS) { + if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) { srs_error("identify client failed. ret=%d", ret); return ret; } srs_verbose("identify client success. type=%d, stream_name=%s", type, req->stream.c_str()); - + // TODO: read from config. int chunk_size = 4096; if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) { @@ -117,13 +117,23 @@ int SrsClient::do_cycle() case SrsClientPlay: { srs_verbose("start to play stream %s.", req->stream.c_str()); - if ((ret = rtmp->start_play(stream_id)) != ERROR_SUCCESS) { + if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to play stream failed. ret=%d", ret); return ret; } srs_info("start to play stream %s success", req->stream.c_str()); return streaming_play(); } + case SrsClientPublish: { + srs_verbose("start to publish stream %s.", req->stream.c_str()); + + if ((ret = rtmp->start_publish(res->stream_id)) != ERROR_SUCCESS) { + srs_error("start to publish stream failed. ret=%d", ret); + return ret; + } + srs_info("start to publish stream %s success", req->stream.c_str()); + return streaming_publish(); + } default: { ret = ERROR_SYSTEM_CLIENT_INVALID; srs_info("invalid client type=%d. ret=%d", type, ret); @@ -140,6 +150,39 @@ int SrsClient::streaming_play() return ret; } +int SrsClient::streaming_publish() +{ + int ret = ERROR_SUCCESS; + + while (true) { + SrsMessage* msg = NULL; + if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv identify client message failed. ret=%d", ret); + return ret; + } + + SrsAutoFree(SrsMessage, msg, false); + + // process UnPublish event. + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + srs_error("decode unpublish message failed. ret=%d", ret); + return ret; + } + + SrsPacket* pkt = msg->get_packet(); + if (dynamic_cast(pkt)) { + SrsFMLEStartPacket* unpublish = dynamic_cast(pkt); + return rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id); + } + + srs_trace("ignore AMF0/AMF3 command message."); + } + } + + return ret; +} + int SrsClient::get_peer_ip() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index 266745a4a..a212ef8e7 100755 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsRtmp; class SrsRequest; +class SrsResponse; /** * the client provides the main logic control for RTMP clients. @@ -43,6 +44,7 @@ class SrsClient : public SrsConnection private: char* ip; SrsRequest* req; + SrsResponse* res; SrsRtmp* rtmp; public: SrsClient(SrsServer* srs_server, st_netfd_t client_stfd); @@ -51,6 +53,7 @@ protected: virtual int do_cycle(); private: virtual int streaming_play(); + virtual int streaming_publish(); virtual int get_peer_ip(); }; diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 06a39507e..5b4be3928 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -199,6 +199,7 @@ messages. #define RTMP_AMF0_COMMAND_RESULT "_result" #define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream" #define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish" +#define RTMP_AMF0_COMMAND_UNPUBLISH "FCUnpublish" #define RTMP_AMF0_COMMAND_PUBLISH "publish" #define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess" @@ -979,6 +980,10 @@ int SrsMessage::decode_packet() srs_info("decode the AMF0/AMF3 command(publish message)."); packet = new SrsPublishPacket(); return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) { + srs_info("decode the AMF0/AMF3 command(unpublish message)."); + packet = new SrsFMLEStartPacket(); + return packet->decode(stream); } // default packet to drop message. @@ -1383,7 +1388,8 @@ int SrsFMLEStartPacket::decode(SrsStream* stream) } if (command_name.empty() || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM - && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH) + && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH + && command_name != RTMP_AMF0_COMMAND_UNPUBLISH) ) { ret = ERROR_RTMP_AMF0_DECODE; srs_error("amf0 decode FMLE start command_name failed. " diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index 408dd9c40..2dc51ca62 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -53,9 +53,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define StatusCodeStreamStart "NetStream.Play.Start" #define StatusCodePublishStart "NetStream.Publish.Start" #define StatusCodeDataStart "NetStream.Data.Start" +#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success" // FMLE #define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish" +#define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish" + +// default stream id for response the createStream request. +#define SRS_DEFAULT_SID 1 SrsRequest::SrsRequest() { @@ -107,6 +112,15 @@ int SrsRequest::discovery_app() return ret; } +SrsResponse::SrsResponse() +{ + stream_id = SRS_DEFAULT_SID; +} + +SrsResponse::~SrsResponse() +{ +} + SrsRtmp::SrsRtmp(st_netfd_t client_stfd) { protocol = new SrsProtocol(client_stfd); @@ -164,6 +178,11 @@ int SrsRtmp::handshake() return ret; } +int SrsRtmp::recv_message(SrsMessage** pmsg) +{ + return protocol->recv_message(pmsg); +} + int SrsRtmp::connect_app(SrsRequest* req) { int ret = ERROR_SUCCESS; @@ -329,7 +348,7 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st if (dynamic_cast(pkt)) { srs_info("identify client by releaseStream, fmle publish."); return identify_fmle_publish_client( - dynamic_cast(pkt), stream_id, type, stream_name); + dynamic_cast(pkt), type, stream_name); } srs_trace("ignore AMF0/AMF3 command message."); @@ -452,79 +471,10 @@ int SrsRtmp::start_play(int stream_id) return ret; } -int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) +int SrsRtmp::start_publish(int stream_id) { int ret = ERROR_SUCCESS; - if (true) { - SrsMessage* msg = new SrsMessage(); - SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { - srs_error("send createStream response message failed. ret=%d", ret); - return ret; - } - srs_info("send createStream response message success."); - } - - while (true) { - SrsMessage* msg = NULL; - if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { - srs_error("recv identify client message failed. ret=%d", ret); - return ret; - } - - SrsAutoFree(SrsMessage, msg, false); - - if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) { - srs_trace("identify ignore messages except " - "AMF0/AMF3 command message. type=%#x", msg->header.message_type); - continue; - } - - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { - srs_error("identify decode message failed. ret=%d", ret); - return ret; - } - - SrsPacket* pkt = msg->get_packet(); - if (dynamic_cast(pkt)) { - SrsPlayPacket* play = dynamic_cast(pkt); - type = SrsClientPlay; - stream_name = play->stream_name; - srs_trace("identity client type=play, stream_name=%s", stream_name.c_str()); - return ret; - } - - srs_trace("ignore AMF0/AMF3 command message."); - } - - return ret; -} - -int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) -{ - int ret = ERROR_SUCCESS; - - type = SrsClientPublish; - stream_name = req->stream_name; - - // createStream response - if (true) { - SrsMessage* msg = new SrsMessage(); - SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { - srs_error("send releaseStream response message failed. ret=%d", ret); - return ret; - } - srs_info("send releaseStream response message success."); - } - // FCPublish double fc_publish_tid = 0; if (true) { @@ -629,6 +579,84 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id srs_info("send onStatus(NetStream.Publish.Start) message success."); } + return ret; +} + +int SrsRtmp::fmle_unpublish(int stream_id, double unpublish_tid) +{ + int ret = ERROR_SUCCESS; + + // publish response onFCUnpublish(NetStream.unpublish.Success) + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); + + pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH; + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess)); + pkt->data->set(StatusDescription, new SrsAmf0String("Stop publishing stream.")); + + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send onFCUnpublish(NetStream.unpublish.Success) message failed. ret=%d", ret); + return ret; + } + srs_info("send onFCUnpublish(NetStream.unpublish.Success) message success."); + } + // FCUnpublish response + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(unpublish_tid); + + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send FCUnpublish response message failed. ret=%d", ret); + return ret; + } + srs_info("send FCUnpublish response message success."); + } + // publish response onStatus(NetStream.Unpublish.Success) + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); + + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess)); + pkt->data->set(StatusDescription, new SrsAmf0String("Stream is now unpublished")); + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID)); + + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send onStatus(NetStream.Unpublish.Success) message failed. ret=%d", ret); + return ret; + } + srs_info("send onStatus(NetStream.Unpublish.Success) message success."); + } + + srs_info("FMLE unpublish success."); + + return ret; +} + +int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) +{ + int ret = ERROR_SUCCESS; + + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id); + + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send createStream response message failed. ret=%d", ret); + return ret; + } + srs_info("send createStream response message success."); + } + while (true) { SrsMessage* msg = NULL; if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { @@ -637,6 +665,52 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id } SrsAutoFree(SrsMessage, msg, false); + + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) { + srs_trace("identify ignore messages except " + "AMF0/AMF3 command message. type=%#x", msg->header.message_type); + continue; + } + + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + srs_error("identify decode message failed. ret=%d", ret); + return ret; + } + + SrsPacket* pkt = msg->get_packet(); + if (dynamic_cast(pkt)) { + SrsPlayPacket* play = dynamic_cast(pkt); + type = SrsClientPlay; + stream_name = play->stream_name; + srs_trace("identity client type=play, stream_name=%s", stream_name.c_str()); + return ret; + } + + srs_trace("ignore AMF0/AMF3 command message."); + } + + return ret; +} + +int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name) +{ + int ret = ERROR_SUCCESS; + + type = SrsClientPublish; + stream_name = req->stream_name; + + // releaseStream response + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id); + + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send releaseStream response message failed. ret=%d", ret); + return ret; + } + srs_info("send releaseStream response message success."); } return ret; diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 14f0fd724..fdb496f56 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsProtocol; +class SrsMessage; class SrsCreateStreamPacket; class SrsFMLEStartPacket; @@ -63,6 +64,17 @@ struct SrsRequest virtual int discovery_app(); }; +/** +* the response to client. +*/ +struct SrsResponse +{ + int stream_id; + + SrsResponse(); + virtual ~SrsResponse(); +}; + /** * the rtmp client type. */ @@ -88,6 +100,7 @@ public: virtual ~SrsRtmp(); public: virtual int handshake(); + virtual int recv_message(SrsMessage** pmsg); virtual int connect_app(SrsRequest* req); virtual int set_window_ack_size(int ack_size); /** @@ -116,9 +129,24 @@ public: * onStatus(NetStream.Data.Start). */ virtual int start_play(int stream_id); + /** + * when client type is publish, response with packets: + * releaseStream response + * FCPublish + * FCPublish response + * createStream response + * onFCPublish(NetStream.Publish.Start) + * onStatus(NetStream.Publish.Start) + */ + virtual int start_publish(int stream_id); + /** + * process the FMLE unpublish event. + * @unpublish_tid the unpublish request transaction id. + */ + virtual int fmle_unpublish(int stream_id, double unpublish_tid); private: virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); - virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); + virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name); }; #endif \ No newline at end of file