From 150489af51ebc26551b69808b4d750fb2f557e43 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 22 Oct 2013 23:37:44 +0800 Subject: [PATCH] support ffmpeg publish --- trunk/src/core/srs_core.hpp | 3 ++ trunk/src/core/srs_core_amf0.cpp | 30 +++++++++++++ trunk/src/core/srs_core_amf0.hpp | 6 +++ trunk/src/core/srs_core_client.cpp | 31 +++++++++++--- trunk/src/core/srs_core_protocol.cpp | 63 ++++++++++++++++++---------- trunk/src/core/srs_core_protocol.hpp | 6 +-- trunk/src/core/srs_core_rtmp.cpp | 10 +++-- trunk/src/core/srs_core_rtmp.hpp | 3 +- trunk/src/core/srs_core_source.cpp | 55 +++++++++++++++++++----- trunk/src/core/srs_core_source.hpp | 8 ++-- 10 files changed, 164 insertions(+), 51 deletions(-) diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 5386a8700..d4a576b45 100755 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -66,4 +66,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define RTMP_SIG_SRS_URL "https://github.com/winlinvip/simple-rtmp-server" #define RTMP_SIG_SRS_VERSION "0.1" +// compare +#define srs_min(a, b) ((a < b)? a : b) + #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_amf0.cpp b/trunk/src/core/srs_core_amf0.cpp index 0b67f3dce..8c8faac2d 100755 --- a/trunk/src/core/srs_core_amf0.cpp +++ b/trunk/src/core/srs_core_amf0.cpp @@ -191,6 +191,11 @@ int SrsUnSortedHashtable::size() return (int)properties.size(); } +void SrsUnSortedHashtable::clear() +{ + properties.clear(); +} + std::string SrsUnSortedHashtable::key_at(int index) { srs_assert(index < size()); @@ -255,6 +260,21 @@ SrsAmf0Any* SrsUnSortedHashtable::ensure_property_string(std::string name) return prop; } +SrsAmf0Any* SrsUnSortedHashtable::ensure_property_number(std::string name) +{ + SrsAmf0Any* prop = get_property(name); + + if (!prop) { + return NULL; + } + + if (!prop->is_number()) { + return NULL; + } + + return prop; +} + SrsAmf0Object::SrsAmf0Object() { marker = RTMP_AMF0_Object; @@ -294,6 +314,11 @@ SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name) return properties.ensure_property_string(name); } +SrsAmf0Any* SrsAmf0Object::ensure_property_number(std::string name) +{ + return properties.ensure_property_number(name); +} + SrsASrsAmf0EcmaArray::SrsASrsAmf0EcmaArray() { marker = RTMP_AMF0_EcmaArray; @@ -308,6 +333,11 @@ int SrsASrsAmf0EcmaArray::size() return properties.size(); } +void SrsASrsAmf0EcmaArray::clear() +{ + properties.clear(); +} + std::string SrsASrsAmf0EcmaArray::key_at(int index) { return properties.key_at(index); diff --git a/trunk/src/core/srs_core_amf0.hpp b/trunk/src/core/srs_core_amf0.hpp index 8b42a00d7..338632722 100755 --- a/trunk/src/core/srs_core_amf0.hpp +++ b/trunk/src/core/srs_core_amf0.hpp @@ -155,12 +155,14 @@ public: virtual ~SrsUnSortedHashtable(); virtual int size(); + virtual void clear(); virtual std::string key_at(int index); virtual SrsAmf0Any* value_at(int index); virtual void set(std::string key, SrsAmf0Any* value); virtual SrsAmf0Any* get_property(std::string name); virtual SrsAmf0Any* ensure_property_string(std::string name); + virtual SrsAmf0Any* ensure_property_number(std::string name); }; /** @@ -185,6 +187,7 @@ public: virtual SrsAmf0Any* get_property(std::string name); virtual SrsAmf0Any* ensure_property_string(std::string name); + virtual SrsAmf0Any* ensure_property_number(std::string name); }; /** @@ -205,6 +208,7 @@ public: virtual ~SrsASrsAmf0EcmaArray(); virtual int size(); + virtual void clear(); virtual std::string key_at(int index); virtual SrsAmf0Any* value_at(int index); virtual void set(std::string key, SrsAmf0Any* value); @@ -265,6 +269,8 @@ extern int srs_amf0_write_null(SrsStream* stream); extern int srs_amf0_read_undefined(SrsStream* stream); extern int srs_amf0_write_undefined(SrsStream* stream); +extern int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value); + /** * read amf0 object from stream. * 2.5 Object Type diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index fcb10e629..9ce1b1e78 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -157,12 +157,20 @@ int SrsClient::streaming_play(SrsSource* source) { int ret = ERROR_SUCCESS; - SrsConsumer* consumer = source->create_consumer(); + SrsConsumer* consumer = NULL; + if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) { + srs_error("create consumer failed. ret=%d", ret); + return ret; + } + srs_assert(consumer != NULL); SrsAutoFree(SrsConsumer, consumer, false); - srs_verbose("consumer created."); + srs_verbose("consumer created success."); while (true) { + // switch to other st-threads. + st_usleep(0); + bool ready = false; if ((ret = rtmp->can_read(SRS_PULSE_TIME_MS, ready)) != ERROR_SUCCESS) { srs_error("wait client control message failed. ret=%d", ret); @@ -183,17 +191,27 @@ int SrsClient::streaming_play(SrsSource* source) } // get messages from consumer. - SrsCommonMessage** msgs = NULL; + SrsSharedPtrMessage** msgs = NULL; int count = 0; if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) { srs_error("get messages from consumer failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage*, msgs, true); + + if (count <= 0) { + srs_verbose("no packets in queue."); + continue; + } + SrsAutoFree(SrsSharedPtrMessage*, msgs, true); // sendout messages for (int i = 0; i < count; i++) { - SrsCommonMessage* msg = msgs[i]; + SrsSharedPtrMessage* msg = msgs[i]; + + // the send_message will free the msg, + // so set the msgs[i] to NULL. + msgs[i] = NULL; + if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { srs_error("send message to client failed. ret=%d", ret); return ret; @@ -209,6 +227,9 @@ int SrsClient::streaming_publish(SrsSource* source) int ret = ERROR_SUCCESS; while (true) { + // switch to other st-threads. + st_usleep(0); + SrsCommonMessage* msg = NULL; if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { srs_error("recv identify client message failed. ret=%d", ret); diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index b4e0b267f..c0eba29d9 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -409,9 +409,7 @@ int SrsProtocol::send_message(ISrsMessage* msg) // sendout header and payload by writev. // decrease the sys invoke count to get higher performance. int payload_size = msg->size - (p - (char*)msg->payload); - if (payload_size > out_chunk_size) { - payload_size = out_chunk_size; - } + payload_size = srs_min(payload_size, out_chunk_size); // send by writev iovec iov[2]; @@ -821,9 +819,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh // the chunk payload size. payload_size = chunk->header.payload_length - chunk->msg->size; - if (payload_size > in_chunk_size) { - payload_size = in_chunk_size; - } + payload_size = srs_min(payload_size, in_chunk_size); srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d", payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size); @@ -940,12 +936,6 @@ ISrsMessage::ISrsMessage() ISrsMessage::~ISrsMessage() { - free_payload(); -} - -void ISrsMessage::free_payload() -{ - srs_freepa(payload); } SrsCommonMessage::SrsCommonMessage() @@ -956,6 +946,11 @@ SrsCommonMessage::SrsCommonMessage() SrsCommonMessage::~SrsCommonMessage() { + // we must directly free the ptrs, + // nevery use the virtual functions to delete, + // for in the destructor, the virtual functions is disabled. + + srs_freepa(payload); srs_freep(packet); srs_freep(stream); } @@ -1141,10 +1136,6 @@ SrsSharedPtrMessage::SrsSharedPtrMessage() } SrsSharedPtrMessage::~SrsSharedPtrMessage() -{ -} - -void SrsSharedPtrMessage::free_payload() { if (ptr) { if (ptr->shared_count == 0) { @@ -1157,15 +1148,14 @@ void SrsSharedPtrMessage::free_payload() bool SrsSharedPtrMessage::can_decode() { - return true; + return false; } -int SrsSharedPtrMessage::initialize(SrsMessageHeader* header, char* payload, int size, int perfer_cid) +int SrsSharedPtrMessage::initialize(ISrsMessage* msg, char* payload, int size) { int ret = ERROR_SUCCESS; - super::header = *header; - + srs_assert(msg != NULL); if (ptr) { ret = ERROR_SYSTEM_ASSERT_FAILED; srs_error("should not set the payload twice. ret=%d", ret); @@ -1174,10 +1164,13 @@ int SrsSharedPtrMessage::initialize(SrsMessageHeader* header, char* payload, int return ret; } + header = msg->header; + header.payload_length = size; + ptr = new SrsSharedPtr(); ptr->payload = payload; ptr->size = size; - ptr->perfer_cid = perfer_cid; + ptr->perfer_cid = msg->get_perfer_cid(); super::payload = (int8_t*)ptr->payload; super::size = ptr->size; @@ -1194,6 +1187,9 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy() } SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); + + copy->header = header; + copy->ptr = ptr; ptr->shared_count++; @@ -2070,12 +2066,33 @@ int SrsOnMetaDataPacket::decode(SrsStream* stream) srs_verbose("decode metadata name success. name=%s", name.c_str()); - if ((ret = srs_amf0_read_object(stream, metadata)) != ERROR_SUCCESS) { + // the metadata maybe object or ecma array + SrsAmf0Any* any = NULL; + if ((ret = srs_amf0_read_any(stream, any)) != ERROR_SUCCESS) { srs_error("decode metadata metadata failed. ret=%d", ret); return ret; } - srs_info("decode metadata success"); + if (any->is_object()) { + srs_freep(metadata); + metadata = srs_amf0_convert(any); + srs_info("decode metadata object success"); + return ret; + } + + SrsASrsAmf0EcmaArray* arr = dynamic_cast(any); + if (!arr) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("decode metadata array failed. ret=%d", ret); + srs_freep(any); + return ret; + } + + for (int i = 0; i < arr->size(); i++) { + metadata->set(arr->key_at(i), arr->value_at(i)); + } + arr->clear(); + srs_info("decode metadata array success"); return ret; } diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index 843d8b77d..14f1dd83b 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -253,8 +253,6 @@ public: public: ISrsMessage(); virtual ~ISrsMessage(); -protected: - virtual void free_payload(); public: /** * whether message canbe decoded. @@ -351,15 +349,13 @@ private: public: SrsSharedPtrMessage(); virtual ~SrsSharedPtrMessage(); -protected: - virtual void free_payload(); public: virtual bool can_decode(); public: /** * set the shared payload. */ - virtual int initialize(SrsMessageHeader* header, char* payload, int size, int perfer_cid); + virtual int initialize(ISrsMessage* msg, char* payload, int size); virtual SrsSharedPtrMessage* copy(); public: /** diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index 7eaa3b25c..dde8fe1c8 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -114,9 +114,13 @@ int SrsRequest::discovery_app() std::string SrsRequest::get_stream_url() { - std::string url = vhost; + std::string url = ""; + //url += vhost; + + url += "/"; url += app; + url += "/"; url += stream; return url; @@ -152,7 +156,7 @@ int SrsRtmp::can_read(int timeout_ms, bool& ready) return protocol->can_read(timeout_ms, ready); } -int SrsRtmp::send_message(SrsCommonMessage* msg) +int SrsRtmp::send_message(ISrsMessage* msg) { return protocol->send_message(msg); } @@ -233,7 +237,7 @@ int SrsRtmp::connect_app(SrsRequest* req) req->swfUrl = srs_amf0_convert(prop)->value; } - if ((prop = pkt->command_object->ensure_property_string("objectEncoding")) != NULL) { + if ((prop = pkt->command_object->ensure_property_number("objectEncoding")) != NULL) { req->objectEncoding = srs_amf0_convert(prop)->value; } diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 23e1f72fd..023fe692e 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsProtocol; +class ISrsMessage; class SrsCommonMessage; class SrsCreateStreamPacket; class SrsFMLEStartPacket; @@ -102,7 +103,7 @@ public: public: virtual int recv_message(SrsCommonMessage** pmsg); virtual int can_read(int timeout_ms, bool& ready); - virtual int send_message(SrsCommonMessage* msg); + virtual int send_message(ISrsMessage* msg); public: virtual int handshake(); virtual int connect_app(SrsRequest* req); diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index c135713d9..afbc8e77c 100755 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -46,27 +46,54 @@ SrsConsumer::SrsConsumer() SrsConsumer::~SrsConsumer() { + std::vector::iterator it; + for (it = msgs.begin(); it != msgs.end(); ++it) { + SrsSharedPtrMessage* msg = *it; + srs_freep(msg); + } + msgs.clear(); } int SrsConsumer::enqueue(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; + msgs.push_back(msg); return ret; } -int SrsConsumer::get_packets(int max_count, SrsCommonMessage**& msgs, int& count) +int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) { - msgs = NULL; - count = 0; - int ret = ERROR_SUCCESS; + + if (msgs.empty()) { + return ret; + } + + if (max_count == 0) { + count = (int)msgs.size(); + } else { + count = srs_min(max_count, (int)msgs.size()); + } + + pmsgs = new SrsSharedPtrMessage*[count]; + + for (int i = 0; i < count; i++) { + pmsgs[i] = msgs[i]; + } + + if (count == (int)msgs.size()) { + msgs.clear(); + } else { + msgs.erase(msgs.begin(), msgs.begin() + count); + } + return ret; } SrsSource::SrsSource(std::string _stream_url) { stream_url = _stream_url; - cache_metadata = new SrsSharedPtrMessage(); + cache_metadata = NULL; } SrsSource::~SrsSource() @@ -110,7 +137,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata cache_metadata = new SrsSharedPtrMessage(); // dump message to shared ptr message. - if ((ret = cache_metadata->initialize(&msg->header, payload, size, msg->get_perfer_cid())) != ERROR_SUCCESS) { + if ((ret = cache_metadata->initialize(msg, payload, size)) != ERROR_SUCCESS) { srs_error("initialize the cache metadata failed. ret=%d", ret); return ret; } @@ -136,7 +163,7 @@ int SrsSource::on_audio(SrsCommonMessage* audio) SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); SrsAutoFree(SrsSharedPtrMessage, msg, false); - if ((ret = msg->initialize(&audio->header, (char*)audio->payload, audio->size, audio->get_perfer_cid())) != ERROR_SUCCESS) { + if ((ret = msg->initialize(audio, (char*)audio->payload, audio->size)) != ERROR_SUCCESS) { srs_error("initialize the audio failed. ret=%d", ret); return ret; } @@ -166,7 +193,7 @@ int SrsSource::on_video(SrsCommonMessage* video) SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); SrsAutoFree(SrsSharedPtrMessage, msg, false); - if ((ret = msg->initialize(&video->header, (char*)video->payload, video->size, video->get_perfer_cid())) != ERROR_SUCCESS) { + if ((ret = msg->initialize(video, (char*)video->payload, video->size)) != ERROR_SUCCESS) { srs_error("initialize the video failed. ret=%d", ret); return ret; } @@ -190,10 +217,16 @@ int SrsSource::on_video(SrsCommonMessage* video) return ret; } -SrsConsumer* SrsSource::create_consumer() + int SrsSource::create_consumer(SrsConsumer*& consumer) { - SrsConsumer* consumer = new SrsConsumer(); + consumer = new SrsConsumer(); consumers.push_back(consumer); - return consumer; + + if (!cache_metadata) { + srs_info("no metadata found."); + return ERROR_SUCCESS; + } + + return consumer->enqueue(cache_metadata->copy()); } diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index b5103be4c..22044357c 100755 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -43,6 +43,8 @@ class SrsSharedPtrMessage; */ class SrsConsumer { +private: + std::vector msgs; public: SrsConsumer(); virtual ~SrsConsumer(); @@ -53,11 +55,11 @@ public: virtual int enqueue(SrsSharedPtrMessage* msg); /** * get packets in consumer queue. - * @msgs SrsMessages*[], output the prt array. + * @pmsgs SrsMessages*[], output the prt array. * @count the count in array. * @max_count the max count to dequeue, 0 to dequeue all. */ - virtual int get_packets(int max_count, SrsCommonMessage**& msgs, int& count); + virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); }; /** @@ -88,7 +90,7 @@ public: virtual int on_audio(SrsCommonMessage* audio); virtual int on_video(SrsCommonMessage* video); public: - virtual SrsConsumer* create_consumer(); + virtual int create_consumer(SrsConsumer*& consumer); }; #endif \ No newline at end of file