From c338eb36667aa6632e448a37ab43fb84015598c3 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 2 Mar 2014 14:51:19 +0800 Subject: [PATCH] srs-librtmp: implements the publish and play. --- trunk/research/librtmp/srs_publish.c | 2 +- trunk/src/app/srs_core_client.cpp | 3 +- trunk/src/libs/srs_librtmp.cpp | 4 + trunk/src/rtmp/srs_protocol_rtmp.cpp | 92 +++++++++++++++ trunk/src/rtmp/srs_protocol_rtmp.hpp | 6 + trunk/src/rtmp/srs_protocol_rtmp_stack.cpp | 129 ++++++++++++++++++++- trunk/src/rtmp/srs_protocol_rtmp_stack.hpp | 12 ++ 7 files changed, 242 insertions(+), 6 deletions(-) diff --git a/trunk/research/librtmp/srs_publish.c b/trunk/research/librtmp/srs_publish.c index f85e8fb3d..0e10eceb2 100644 --- a/trunk/research/librtmp/srs_publish.c +++ b/trunk/research/librtmp/srs_publish.c @@ -35,7 +35,7 @@ int main(int argc, char** argv) printf("srs(simple-rtmp-server) client librtmp library.\n"); printf("version: %d.%d.%d\n", srs_version_major(), srs_version_minor(), srs_version_revision()); - rtmp = srs_rtmp_create("rtmp://127.0.0.1:1936/live/livestream"); + rtmp = srs_rtmp_create("rtmp://127.0.0.1:1935/live/livestream"); if (srs_simple_handshake(rtmp) != 0) { printf("simple handshake failed.\n"); diff --git a/trunk/src/app/srs_core_client.cpp b/trunk/src/app/srs_core_client.cpp index 76cebb7c2..a005bf817 100644 --- a/trunk/src/app/srs_core_client.cpp +++ b/trunk/src/app/srs_core_client.cpp @@ -230,7 +230,8 @@ int SrsClient::stream_service_cycle() return ret; } req->strip(); - srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str()); + srs_trace("identify client success. type=%s, stream_name=%s", + srs_client_type_string(type).c_str(), req->stream.c_str()); // client is identified, set the timeout to service timeout. rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US); diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index 2dd99b330..1683299a2 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -249,6 +249,10 @@ int srs_publish_stream(srs_rtmp_t rtmp) srs_assert(rtmp != NULL); Context* context = (Context*)rtmp; + if ((ret = context->rtmp->fmle_publish(context->stream, context->stream_id)) != ERROR_SUCCESS) { + return ret; + } + return ret; } diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index ec319b91a..b9b6e2a4f 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -176,6 +176,17 @@ SrsResponse::~SrsResponse() { } +string srs_client_type_string(SrsClientType type) +{ + switch (type) { + case SrsClientPlay: return "Play"; + case SrsClientFlashPublish: return "FlashPublish"; + case SrsClientFMLEPublish: return "FMLEPublish"; + default: return "Unknown"; + } + return "Unknown"; +} + SrsRtmpClient::SrsRtmpClient(ISrsProtocolReaderWriter* skt) { io = skt; @@ -441,6 +452,87 @@ int SrsRtmpClient::publish(string stream, int stream_id) return ret; } +int SrsRtmpClient::fmle_publish(string stream, int& stream_id) +{ + stream_id = 0; + + int ret = ERROR_SUCCESS; + + // SrsFMLEStartPacket + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsFMLEStartPacket* pkt = SrsFMLEStartPacket::create_release_stream(stream); + + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send FMLE publish " + "release stream failed. stream=%s, ret=%d", stream.c_str(), ret); + return ret; + } + } + + // FCPublish + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsFMLEStartPacket* pkt = SrsFMLEStartPacket::create_FC_publish(stream); + + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send FMLE publish " + "FCPublish failed. stream=%s, ret=%d", stream.c_str(), ret); + return ret; + } + } + + // CreateStream + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket(); + + pkt->transaction_id = 4; + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send FMLE publish " + "createStream failed. stream=%s, ret=%d", stream.c_str(), ret); + return ret; + } + } + + // expect result of CreateStream + if (true) { + SrsCommonMessage* msg = NULL; + SrsCreateStreamResPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect create stream response message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get create stream response message"); + + stream_id = (int)pkt->stream_id; + } + + // publish(stream) + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsPublishPacket* pkt = new SrsPublishPacket(); + + pkt->stream_name = stream; + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send FMLE publish publish failed. " + "stream=%s, stream_id=%d, ret=%d", stream.c_str(), stream_id, ret); + return ret; + } + } + + return ret; +} + SrsRtmpServer::SrsRtmpServer(ISrsProtocolReaderWriter* skt) { io = skt; diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index 3231f1537..4f4cc40cd 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -106,6 +106,7 @@ enum SrsClientType SrsClientFMLEPublish, SrsClientFlashPublish, }; +std::string srs_client_type_string(SrsClientType type); /** * implements the client role protocol. @@ -137,7 +138,12 @@ public: virtual int connect_app(std::string app, std::string tc_url); virtual int create_stream(int& stream_id); virtual int play(std::string stream, int stream_id); + // flash publish schema: + // connect-app => create-stream => flash-publish virtual int publish(std::string stream, int stream_id); + // FMLE publish schema: + // connect-app => FMLE publish + virtual int fmle_publish(std::string stream, int& stream_id); }; /** diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp index 18b9f58e6..e2b99c70a 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp @@ -676,7 +676,7 @@ int SrsProtocol::on_send_message(ISrsMessage* msg) SrsConnectAppPacket* pkt = NULL; pkt = dynamic_cast(common_msg->get_packet()); if (pkt) { - requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CONNECT; + requests[pkt->transaction_id] = pkt->command_name; break; } } @@ -684,7 +684,15 @@ int SrsProtocol::on_send_message(ISrsMessage* msg) SrsCreateStreamPacket* pkt = NULL; pkt = dynamic_cast(common_msg->get_packet()); if (pkt) { - requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CREATE_STREAM; + requests[pkt->transaction_id] = pkt->command_name; + break; + } + } + if (true) { + SrsFMLEStartPacket* pkt = NULL; + pkt = dynamic_cast(common_msg->get_packet()); + if (pkt) { + requests[pkt->transaction_id] = pkt->command_name; break; } } @@ -1291,13 +1299,19 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol) srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str()); if (request_name == RTMP_AMF0_COMMAND_CONNECT) { - srs_info("decode the AMF0/AMF3 response command(connect vhost/app message)."); + srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); packet = new SrsConnectAppResPacket(); return packet->decode(stream); } else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) { - srs_info("decode the AMF0/AMF3 response command(createStream message)."); + srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); packet = new SrsCreateStreamResPacket(0, 0); return packet->decode(stream); + } else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM + || request_name == RTMP_AMF0_COMMAND_FC_PUBLISH + || request_name == RTMP_AMF0_COMMAND_UNPUBLISH) { + srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); + packet = new SrsFMLEStartResPacket(0); + return packet->decode(stream); } else { ret = ERROR_RTMP_NO_REQUEST; srs_error("decode AMF0/AMF3 request failed. " @@ -2157,6 +2171,78 @@ int SrsFMLEStartPacket::decode(SrsStream* stream) return ret; } +int SrsFMLEStartPacket::get_perfer_cid() +{ + return RTMP_CID_OverConnection; +} + +int SrsFMLEStartPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsFMLEStartPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size() + srs_amf0_get_string_size(stream_name); +} + +int SrsFMLEStartPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode command_object failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_object success."); + + if ((ret = srs_amf0_write_string(stream, stream_name)) != ERROR_SUCCESS) { + srs_error("encode stream_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode stream_name success."); + + + srs_info("encode FMLE start response packet success."); + + return ret; +} + +SrsFMLEStartPacket* SrsFMLEStartPacket::create_release_stream(string stream) +{ + SrsFMLEStartPacket* pkt = new SrsFMLEStartPacket(); + + pkt->command_name = RTMP_AMF0_COMMAND_RELEASE_STREAM; + pkt->transaction_id = 2; + pkt->stream_name = stream; + + return pkt; +} + +SrsFMLEStartPacket* SrsFMLEStartPacket::create_FC_publish(string stream) +{ + SrsFMLEStartPacket* pkt = new SrsFMLEStartPacket(); + + pkt->command_name = RTMP_AMF0_COMMAND_FC_PUBLISH; + pkt->transaction_id = 3; + pkt->stream_name = stream; + + return pkt; +} + SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id) { command_name = RTMP_AMF0_COMMAND_RESULT; @@ -2171,6 +2257,41 @@ SrsFMLEStartResPacket::~SrsFMLEStartResPacket() srs_freep(args); } +int SrsFMLEStartResPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode FMLE start response command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_RESULT) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode FMLE start response command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode FMLE start response transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode FMLE start response command_object failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_undefined(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode FMLE start response stream_id failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode FMLE start packet success"); + + return ret; +} + int SrsFMLEStartResPacket::get_perfer_cid() { return RTMP_CID_OverConnection; diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index 7b0cc76ef..cbca5f654 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -681,6 +681,16 @@ public: virtual ~SrsFMLEStartPacket(); public: virtual int decode(SrsStream* stream); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +public: + static SrsFMLEStartPacket* create_release_stream(std::string stream); + static SrsFMLEStartPacket* create_FC_publish(std::string stream); }; /** * response for SrsFMLEStartPacket. @@ -702,6 +712,8 @@ public: public: SrsFMLEStartResPacket(double _transaction_id); virtual ~SrsFMLEStartResPacket(); +public: + virtual int decode(SrsStream* stream); public: virtual int get_perfer_cid(); public: