diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp index 1a3b36649..f3b3e03f2 100644 --- a/trunk/src/core/srs_core_forward.cpp +++ b/trunk/src/core/srs_core_forward.cpp @@ -108,7 +108,7 @@ void SrsForwarder::on_unpublish() srs_freep(client); } -int SrsForwarder::on_meta_data(SrsOnMetaDataPacket* metadata) +int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) { int ret = ERROR_SUCCESS; return ret; @@ -222,8 +222,13 @@ int SrsForwarder::forward_cycle_imp() srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret); return ret; } - if ((ret = client->play_stream(stream_name, stream_id)) != ERROR_SUCCESS) { - srs_error("connect with server failed, stream_name=%s. ret=%d", stream_name.c_str(), ret); + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + return ret; + } + if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", + stream_name.c_str(), stream_id, ret); return ret; } diff --git a/trunk/src/core/srs_core_forward.hpp b/trunk/src/core/srs_core_forward.hpp index e0570705b..0eaf1d236 100644 --- a/trunk/src/core/srs_core_forward.hpp +++ b/trunk/src/core/srs_core_forward.hpp @@ -61,7 +61,7 @@ public: public: virtual int on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server); virtual void on_unpublish(); - virtual int on_meta_data(SrsOnMetaDataPacket* metadata); + virtual int on_meta_data(SrsSharedPtrMessage* metadata); virtual int on_audio(SrsSharedPtrMessage* msg); virtual int on_video(SrsSharedPtrMessage* msg); private: diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 911deac04..940666f09 100644 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -641,6 +641,15 @@ int SrsProtocol::on_send_message(ISrsMessage* msg) pkt = dynamic_cast(common_msg->get_packet()); if (pkt) { requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CONNECT; + break; + } + } + if (true) { + SrsCreateStreamPacket* pkt = NULL; + pkt = dynamic_cast(common_msg->get_packet()); + if (pkt) { + requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CREATE_STREAM; + break; } } break; @@ -1246,6 +1255,10 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol) srs_info("decode the AMF0/AMF3 response command(connect vhost/app message)."); packet = new SrsConnectAppResPacket(); return packet->decode(stream); + } else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) { + srs_info("decode the AMF0/AMF3 response command(createStream message)."); + packet = new SrsCreateStreamResPacket(0, 0); + return packet->decode(stream); } else { ret = ERROR_RTMP_NO_REQUEST; srs_error("decode AMF0/AMF3 request failed. " @@ -1841,6 +1854,49 @@ int SrsCreateStreamPacket::decode(SrsStream* stream) return ret; } +int SrsCreateStreamPacket::get_perfer_cid() +{ + return RTMP_CID_OverConnection; +} + +int SrsCreateStreamPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsCreateStreamPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size(); +} + +int SrsCreateStreamPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode command_object failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_object success."); + + srs_info("encode create stream request packet success."); + + return ret; +} + SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, double _stream_id) { command_name = RTMP_AMF0_COMMAND_RESULT; @@ -1854,6 +1910,41 @@ SrsCreateStreamResPacket::~SrsCreateStreamResPacket() srs_freep(command_object); } +int SrsCreateStreamResPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode createStream command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_RESULT) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode createStream command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode createStream transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode createStream command_object failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, stream_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode createStream stream_id failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode createStream response packet success"); + + return ret; +} + int SrsCreateStreamResPacket::get_perfer_cid() { return RTMP_CID_OverConnection; @@ -2072,6 +2163,62 @@ int SrsPublishPacket::decode(SrsStream* stream) return ret; } +int SrsPublishPacket::get_perfer_cid() +{ + return RTMP_CID_OverStream; +} + +int SrsPublishPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsPublishPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size() + srs_amf0_get_string_size(stream_name) + + srs_amf0_get_string_size(type); +} + +int SrsPublishPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode command_object failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_object success."); + + if ((ret = srs_amf0_write_string(stream, stream_name)) != ERROR_SUCCESS) { + srs_error("encode stream_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode stream_name success."); + + if ((ret = srs_amf0_write_string(stream, type)) != ERROR_SUCCESS) { + srs_error("encode type failed. ret=%d", ret); + return ret; + } + srs_verbose("encode type success."); + + srs_info("encode play request packet success."); + + return ret; +} + SrsPausePacket::SrsPausePacket() { command_name = RTMP_AMF0_COMMAND_PAUSE; @@ -2191,6 +2338,75 @@ int SrsPlayPacket::decode(SrsStream* stream) return ret; } +int SrsPlayPacket::get_perfer_cid() +{ + return RTMP_CID_OverStream; +} + +int SrsPlayPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsPlayPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size() + srs_amf0_get_string_size(stream_name) + + srs_amf0_get_number_size() + srs_amf0_get_number_size() + + srs_amf0_get_boolean_size(); +} + +int SrsPlayPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode command_object failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_object success."); + + if ((ret = srs_amf0_write_string(stream, stream_name)) != ERROR_SUCCESS) { + srs_error("encode stream_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode stream_name success."); + + if ((ret = srs_amf0_write_number(stream, start)) != ERROR_SUCCESS) { + srs_error("encode start failed. ret=%d", ret); + return ret; + } + srs_verbose("encode start success."); + + if ((ret = srs_amf0_write_number(stream, duration)) != ERROR_SUCCESS) { + srs_error("encode duration failed. ret=%d", ret); + return ret; + } + srs_verbose("encode duration success."); + + if ((ret = srs_amf0_write_boolean(stream, reset)) != ERROR_SUCCESS) { + srs_error("encode reset failed. ret=%d", ret); + return ret; + } + srs_verbose("encode reset success."); + + srs_info("encode play request packet success."); + + return ret; +} + SrsPlayResPacket::SrsPlayResPacket() { command_name = RTMP_AMF0_COMMAND_RESULT; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index bc3fb6502..95784c6b3 100644 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -552,6 +552,13 @@ public: virtual ~SrsCreateStreamPacket(); public: virtual int decode(SrsStream* stream); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); }; /** * response for SrsCreateStreamPacket. @@ -573,6 +580,8 @@ public: public: SrsCreateStreamResPacket(double _transaction_id, double _stream_id); virtual ~SrsCreateStreamResPacket(); +public: + virtual int decode(SrsStream* stream); public: virtual int get_perfer_cid(); public: @@ -662,6 +671,13 @@ public: virtual ~SrsPublishPacket(); public: virtual int decode(SrsStream* stream); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); }; /** @@ -717,6 +733,13 @@ public: virtual ~SrsPlayPacket(); public: virtual int decode(SrsStream* stream); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); }; /** * response for SrsPlayPacket. @@ -1050,6 +1073,7 @@ protected: return CLASS_NAME_STRING(SrsUserControlPacket); } public: + // @see: SrcPCUCEventType int16_t event_type; int32_t event_data; /** diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index d113d3717..7d2cfa684 100644 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -252,12 +252,99 @@ int SrsRtmpClient::connect_app(std::string app, std::string tc_url) return ret; } -int SrsRtmpClient::play_stream(std::string stream, int& stream_id) +int SrsRtmpClient::create_stream(int& stream_id) { int ret = ERROR_SUCCESS; // CreateStream if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket(); + + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + return ret; + } + } + + // CreateStream _result. + if (true) { + SrsCommonMessage* msg = NULL; + SrsCreateStreamResPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect create stream response message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get create stream response message"); + + stream_id = (int)pkt->stream_id; + } + + return ret; +} + +int SrsRtmpClient::play(std::string stream, int stream_id) +{ + int ret = ERROR_SUCCESS; + + // Play(stream) + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsPlayPacket* pkt = new SrsPlayPacket(); + + pkt->stream_name = stream; + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send play stream failed. " + "stream=%s, stream_id=%d, ret=%d", + stream.c_str(), stream_id, ret); + return ret; + } + } + + // SetBufferLength(1000ms) + int buffer_length_ms = 1000; + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsUserControlPacket* pkt = new SrsUserControlPacket(); + + pkt->event_type = SrcPCUCSetBufferLength; + pkt->event_data = stream_id; + pkt->extra_data = buffer_length_ms; + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send set buffer length failed. " + "stream=%s, stream_id=%d, bufferLength=%d, ret=%d", + stream.c_str(), stream_id, buffer_length_ms, ret); + return ret; + } + } + + return ret; +} + +int SrsRtmpClient::publish(std::string stream, int stream_id) +{ + int ret = ERROR_SUCCESS; + + // publish(stream) + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsPublishPacket* pkt = new SrsPublishPacket(); + + pkt->stream_name = stream; + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send publish message failed. " + "stream=%s, stream_id=%d, ret=%d", + stream.c_str(), stream_id, ret); + return ret; + } } return ret; diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index c3c4ec5f3..b2ad194fb 100644 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -111,7 +111,9 @@ public: public: virtual int handshake(); virtual int connect_app(std::string app, std::string tc_url); - virtual int play_stream(std::string stream, int& stream_id); + virtual int create_stream(int& stream_id); + virtual int play(std::string stream, int stream_id); + virtual int publish(std::string stream, int stream_id); }; /** diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 26b772174..9b2791164 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -409,17 +409,6 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata return ret; } #endif - - if (true) { - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_meta_data(metadata)) != ERROR_SUCCESS) { - srs_error("forwarder process onMetaData message failed. ret=%d", ret); - return ret; - } - } - } metadata->metadata->set("server", new SrsAmf0String( RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); @@ -477,6 +466,18 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata srs_trace("dispatch metadata success."); } + // copy to all forwarders + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) { + srs_error("forwarder process onMetaData message failed. ret=%d", ret); + return ret; + } + } + } + return ret; } @@ -498,17 +499,6 @@ int SrsSource::on_audio(SrsCommonMessage* audio) return ret; } #endif - - if (true) { - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) { - srs_error("forwarder process audio message failed. ret=%d", ret); - return ret; - } - } - } // copy to all consumer if (true) { @@ -523,6 +513,18 @@ int SrsSource::on_audio(SrsCommonMessage* audio) srs_info("dispatch audio success."); } + // copy to all forwarders. + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) { + srs_error("forwarder process audio message failed. ret=%d", ret); + return ret; + } + } + } + // cache the sequence header if h264 if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) { srs_freep(cache_sh_audio); @@ -559,17 +561,6 @@ int SrsSource::on_video(SrsCommonMessage* video) return ret; } #endif - - if (true) { - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) { - srs_error("forwarder process video message failed. ret=%d", ret); - return ret; - } - } - } // copy to all consumer if (true) { @@ -584,6 +575,18 @@ int SrsSource::on_video(SrsCommonMessage* video) srs_info("dispatch video success."); } + // copy to all forwarders. + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) { + srs_error("forwarder process video message failed. ret=%d", ret); + return ret; + } + } + } + // cache the sequence header if h264 if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) { srs_freep(cache_sh_video);