diff --git a/trunk/research/hls/ts_info.cc b/trunk/research/hls/ts_info.cc index 9c775abc7..5045ac222 100644 --- a/trunk/research/hls/ts_info.cc +++ b/trunk/research/hls/ts_info.cc @@ -44,7 +44,6 @@ g++ -o ts_info ts_info.cc -g -O0 -ansi #define trace(msg, ...) printf(msg"\n", ##__VA_ARGS__); #define srs_freep(p) delete p; p = NULL -#define srs_freepa(p) delete[] p; p = NULL #define srs_assert(p) assert(p) #define srs_min(a, b) ((a)<(b)? (a):(b)) diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 9f689d9de..1d0dde5e9 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -263,7 +263,7 @@ int SrsDvrPlan::flv_open(string stream, string path) segment->reset(); std::string tmp_file = path + ".tmp"; - if ((ret = fs->open(tmp_file)) != ERROR_SUCCESS) { + if ((ret = fs->open_write(tmp_file)) != ERROR_SUCCESS) { srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret); return ret; } @@ -287,9 +287,7 @@ int SrsDvrPlan::flv_close() { int ret = ERROR_SUCCESS; - if ((ret = fs->close()) != ERROR_SUCCESS) { - return ret; - } + fs->close(); std::string tmp_file = segment->path + ".tmp"; if (rename(tmp_file.c_str(), segment->path.c_str()) < 0) { @@ -552,7 +550,7 @@ int SrsDvrHssPlan::on_meta_data(SrsOnMetaDataPacket* metadata) << req->stream << ".header.flv"; SrsFileStream fs; - if ((ret = fs.open(path.str().c_str())) != ERROR_SUCCESS) { + if ((ret = fs.open_write(path.str().c_str())) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_flv.cpp b/trunk/src/app/srs_app_flv.cpp index fad9f0187..42a723f5d 100644 --- a/trunk/src/app/srs_app_flv.cpp +++ b/trunk/src/app/srs_app_flv.cpp @@ -38,6 +38,9 @@ using namespace std; #include #include +#define SRS_FLV_TAG_HEADER_SIZE 11 +#define SRS_FLV_PREVIOUS_TAG_SIZE 4 + SrsFileStream::SrsFileStream() { fd = -1; @@ -48,7 +51,7 @@ SrsFileStream::~SrsFileStream() close(); } -int SrsFileStream::open(string file) +int SrsFileStream::open_write(string file) { int ret = ERROR_SUCCESS; @@ -72,22 +75,43 @@ int SrsFileStream::open(string file) return ret; } -int SrsFileStream::close() +int SrsFileStream::open_read(string file) +{ + int ret = ERROR_SUCCESS; + + if (fd > 0) { + ret = ERROR_SYSTEM_FILE_ALREADY_OPENED; + srs_error("file %s already opened. ret=%d", _file.c_str(), ret); + return ret; + } + + if ((fd = ::open(file.c_str(), O_RDONLY)) < 0) { + ret = ERROR_SYSTEM_FILE_OPENE; + srs_error("open file %s failed. ret=%d", file.c_str(), ret); + return ret; + } + + _file = file; + + return ret; +} + +void SrsFileStream::close() { int ret = ERROR_SUCCESS; if (fd < 0) { - return ret; + return; } if (::close(fd) < 0) { ret = ERROR_SYSTEM_FILE_CLOSE; srs_error("close file %s failed. ret=%d", _file.c_str(), ret); - return ret; + return; } fd = -1; - return ret; + return; } bool SrsFileStream::is_open() @@ -141,6 +165,24 @@ int64_t SrsFileStream::tellg() return (int64_t)::lseek(fd, 0, SEEK_CUR); } +int64_t SrsFileStream::lseek(int64_t offset) +{ + return (int64_t)::lseek(fd, offset, SEEK_SET); +} + +int64_t SrsFileStream::filesize() +{ + int64_t cur = tellg(); + int64_t size = (int64_t)::lseek(fd, 0, SEEK_END); + ::lseek(fd, cur, SEEK_SET); + return size; +} + +void SrsFileStream::skip(int64_t size) +{ + ::lseek(fd, size, SEEK_CUR); +} + SrsFlvEncoder::SrsFlvEncoder() { _fs = NULL; @@ -165,6 +207,7 @@ int SrsFlvEncoder::write_header() { int ret = ERROR_SUCCESS; + // 9bytes header and 4bytes first previous-tag-size static char flv_header[] = { 'F', 'L', 'V', // Signatures "FLV" (char)0x01, // File version (for example, 0x01 for FLV version 1) @@ -218,6 +261,7 @@ int SrsFlvEncoder::write_audio(int64_t timestamp, char* data, int size) timestamp &= 0x7fffffff; + // 11bytes tag header static char tag_header[] = { (char)8, // TagType UB [5], 8 = audio (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. @@ -249,6 +293,7 @@ int SrsFlvEncoder::write_video(int64_t timestamp, char* data, int size) timestamp &= 0x7fffffff; + // 11bytes tag header static char tag_header[] = { (char)9, // TagType UB [5], 9 = video (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. @@ -291,8 +336,8 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s } // PreviousTagSizeN UI32 Size of last tag, including its header, in bytes. - static char pre_size[4]; - if ((ret = tag_stream->initialize(pre_size, 4)) != ERROR_SUCCESS) { + static char pre_size[SRS_FLV_PREVIOUS_TAG_SIZE]; + if ((ret = tag_stream->initialize(pre_size, SRS_FLV_PREVIOUS_TAG_SIZE)) != ERROR_SUCCESS) { return ret; } tag_stream->write_4bytes(tag_size + header_size); @@ -304,3 +349,163 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s return ret; } +SrsFlvFastDecoder::SrsFlvFastDecoder() +{ + _fs = NULL; + tag_stream = new SrsStream(); +} + +SrsFlvFastDecoder::~SrsFlvFastDecoder() +{ + srs_freep(tag_stream); +} + +int SrsFlvFastDecoder::initialize(SrsFileStream* fs) +{ + int ret = ERROR_SUCCESS; + + _fs = fs; + + return ret; +} + +int SrsFlvFastDecoder::read_header(char** pdata, int* psize) +{ + *pdata = NULL; + *psize = 0; + + int ret = ERROR_SUCCESS; + + srs_assert(_fs); + + // 9bytes header and 4bytes first previous-tag-size + int size = 13; + char* buf = new char[size]; + + if ((ret = _fs->read(buf, size, NULL)) != ERROR_SUCCESS) { + return ret; + } + + *pdata = buf; + *psize = size; + + return ret; +} + +int SrsFlvFastDecoder::read_sequence_header(int64_t* pstart, int* psize) +{ + *pstart = 0; + *psize = 0; + + int ret = ERROR_SUCCESS; + + srs_assert(_fs); + + // simply, the first video/audio must be the sequence header. + // and must be a sequence video and audio. + + // 11bytes tag header + static char tag_header[] = { + (char)0x00, // TagType UB [5], 9 = video, 8 = audio, 18 = script data + (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. + (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies. + (char)0x00, // TimestampExtended UI8 + (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0. + }; + + // discovery the sequence header video and audio. + // @remark, maybe no video or no audio. + bool got_video = false; + bool got_audio = false; + // audio/video sequence and data offset. + int64_t av_sequence_offset_start = -1; + int64_t av_sequence_offset_end = -1; + for (;;) { + if ((ret = _fs->read(tag_header, SRS_FLV_TAG_HEADER_SIZE, NULL)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = tag_stream->initialize(tag_header, SRS_FLV_TAG_HEADER_SIZE)) != ERROR_SUCCESS) { + return ret; + } + + int8_t tag_type = tag_stream->read_1bytes(); + int32_t data_size = tag_stream->read_3bytes(); + + bool is_video = tag_type == 0x09; + bool is_audio = tag_type == 0x08; + bool is_not_av = !is_video && !is_audio; + if (is_not_av) { + // skip body and tag size. + _fs->skip(data_size + SRS_FLV_PREVIOUS_TAG_SIZE); + continue; + } + + // if video duplicated, no audio + if (is_video && got_video) { + break; + } + // if audio duplicated, no video + if (is_audio && got_audio) { + break; + } + + // video + if (is_video) { + srs_assert(!got_video); + got_video = true; + + if (av_sequence_offset_start < 0) { + av_sequence_offset_start = _fs->tellg() - SRS_FLV_TAG_HEADER_SIZE; + } + av_sequence_offset_end = _fs->tellg() + data_size + SRS_FLV_PREVIOUS_TAG_SIZE; + _fs->skip(data_size + SRS_FLV_PREVIOUS_TAG_SIZE); + } + + // audio + if (is_audio) { + srs_assert(!got_audio); + got_audio = true; + + if (av_sequence_offset_start < 0) { + av_sequence_offset_start = _fs->tellg() - SRS_FLV_TAG_HEADER_SIZE; + } + av_sequence_offset_end = _fs->tellg() + data_size + SRS_FLV_PREVIOUS_TAG_SIZE; + _fs->skip(data_size + SRS_FLV_PREVIOUS_TAG_SIZE); + } + } + + // seek to the sequence header start offset. + if (av_sequence_offset_start > 0) { + _fs->lseek(av_sequence_offset_start); + *pstart = av_sequence_offset_start; + *psize = (int)(av_sequence_offset_end - av_sequence_offset_start); + } + + return ret; +} + +int SrsFlvFastDecoder::lseek(int64_t offset) +{ + int ret = ERROR_SUCCESS; + + srs_assert(_fs); + + if (offset >= _fs->filesize()) { + ret = ERROR_SYSTEM_FILE_EOF; + srs_warn("flv fast decoder seek overflow file, " + "size=%"PRId64", offset=%"PRId64", ret=%d", + _fs->filesize(), offset, ret); + return ret; + } + + if (_fs->lseek(offset) < 0) { + ret = ERROR_SYSTEM_FILE_SEEK; + srs_warn("flv fast decoder seek error, " + "size=%"PRId64", offset=%"PRId64", ret=%d", + _fs->filesize(), offset, ret); + return ret; + } + + return ret; +} diff --git a/trunk/src/app/srs_app_flv.hpp b/trunk/src/app/srs_app_flv.hpp index da2fae734..6c8b0d76f 100644 --- a/trunk/src/app/srs_app_flv.hpp +++ b/trunk/src/app/srs_app_flv.hpp @@ -43,8 +43,9 @@ public: SrsFileStream(); virtual ~SrsFileStream(); public: - virtual int open(std::string file); - virtual int close(); + virtual int open_write(std::string file); + virtual int open_read(std::string file); + virtual void close(); virtual bool is_open(); public: /** @@ -59,6 +60,9 @@ public: * tell current offset of stream. */ virtual int64_t tellg(); + virtual int64_t lseek(int64_t offset); + virtual int64_t filesize(); + virtual void skip(int64_t size); }; /** @@ -104,4 +108,38 @@ private: virtual int write_tag(char* header, int header_size, char* tag, int tag_size); }; +/** +* decode flv fast by only decoding the header and tag. +*/ +class SrsFlvFastDecoder +{ +private: + SrsFileStream* _fs; +private: + SrsStream* tag_stream; +public: + SrsFlvFastDecoder(); + virtual ~SrsFlvFastDecoder(); +public: + /** + * initialize the underlayer file stream, + * user can initialize multiple times to encode multiple flv files. + */ + virtual int initialize(SrsFileStream* fs); +public: + /** + * read the flv header and size. + */ + virtual int read_header(char** pdata, int* psize); + /** + * read the sequence header and size. + */ + virtual int read_sequence_header(int64_t* pstart, int* psize); +public: + /** + * for start offset, seed to this position and response flv stream. + */ + virtual int lseek(int64_t offset); +}; + #endif \ No newline at end of file diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index 8a1f2d130..228cff627 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -707,6 +707,28 @@ void SrsHttpMessage::append_body(const char* body, int length) _body->append(body, length); } +string SrsHttpMessage::query_get(string key) +{ + std::string q = query(); + size_t pos = std::string::npos; + + // must format as key=value&...&keyN=valueN + if ((pos = key.find("=")) != key.length() - 1) { + key = key + "="; + } + + if ((pos = q.find(key)) == std::string::npos) { + return ""; + } + + std::string v = q.substr(pos + key.length()); + if ((pos = v.find("&")) != std::string::npos) { + v = v.substr(0, pos); + } + + return v; +} + int SrsHttpMessage::request_header_count() { return (int)headers.size(); diff --git a/trunk/src/app/srs_app_http.hpp b/trunk/src/app/srs_app_http.hpp index 869b14e40..21a2c4e77 100644 --- a/trunk/src/app/srs_app_http.hpp +++ b/trunk/src/app/srs_app_http.hpp @@ -358,6 +358,13 @@ public: virtual void set_match(SrsHttpHandlerMatch* match); virtual void set_requires_crossdomain(bool requires_crossdomain); virtual void append_body(const char* body, int length); +public: + /** + * get the param in query string, + * for instance, query is "start=100&end=200", + * then query_get("start") is "100", and query_get("end") is "200" + */ + virtual std::string query_get(std::string key); public: virtual int request_header_count(); virtual std::string request_header_key_at(int index); diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index e8b404c4e..3b8bb0a1b 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -25,12 +25,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_AUTO_HTTP_SERVER -#include -using namespace std; - #include #include #include +#include + +#include +using namespace std; #include #include @@ -39,6 +40,7 @@ using namespace std; #include #include #include +#include #define SRS_HTTP_DEFAULT_PAGE "index.html" @@ -166,7 +168,17 @@ int SrsHttpVhost::do_process_request(SrsSocket* skt, SrsHttpMessage* req) if (srs_string_ends_with(fullpath, ".ts")) { return response_ts_file(skt, req, fullpath); } else if (srs_string_ends_with(fullpath, ".flv") || srs_string_ends_with(fullpath, ".fhv")) { - return response_flv_file(skt, req, fullpath); + std::string start = req->query_get("start"); + if (start.empty()) { + return response_flv_file(skt, req, fullpath); + } + + int offset = ::atoi(start.c_str()); + if (offset <= 0) { + return response_flv_file(skt, req, fullpath); + } + + return response_flv_file2(skt, req, fullpath, offset); } else { return response_regular_file(skt, req, fullpath); } @@ -283,6 +295,112 @@ int SrsHttpVhost::response_flv_file(SrsSocket* skt, SrsHttpMessage* req, string return ret; } +int SrsHttpVhost::response_flv_file2(SrsSocket* skt, SrsHttpMessage* req, string fullpath, int offset) +{ + int ret = ERROR_SUCCESS; + + SrsFileStream fs; + + // open flv file + if ((ret = fs.open_read(fullpath)) != ERROR_SUCCESS) { + return ret; + } + + if (offset > fs.filesize()) { + ret = ERROR_HTTP_FLV_OFFSET_OVERFLOW; + srs_warn("http flv streaming %s overflow. size=%"PRId64", offset=%d, ret=%d", + fullpath.c_str(), fs.filesize(), offset, ret); + return ret; + } + + SrsFlvFastDecoder ffd; + + // open fast decoder + if ((ret = ffd.initialize(&fs)) != ERROR_SUCCESS) { + return ret; + } + + // save header, send later. + char* flv_header = NULL; + int flv_size = 0; + + // send flv header + if ((ret = ffd.read_header(&flv_header, &flv_size)) != ERROR_SUCCESS) { + return ret; + } + SrsAutoFree(char, flv_header); + + // save sequence header, send later + char* sh_data = NULL; + int sh_size = 0; + + if (true) { + // send sequence header + int64_t start = 0; + if ((ret = ffd.read_sequence_header(&start, &sh_size)) != ERROR_SUCCESS) { + return ret; + } + if (sh_size <= 0) { + ret = ERROR_HTTP_FLV_SEQUENCE_HEADER; + srs_warn("http flv streaming no sequence header. size=%d, ret=%d", sh_size, ret); + return ret; + } + } + sh_data = new char[sh_size]; + SrsAutoFree(char, sh_data); + if ((ret = fs.read(sh_data, sh_size, NULL)) != ERROR_SUCCESS) { + return ret; + } + + // seek to data offset + int64_t left = fs.filesize() - offset; + + // write http header for ts. + std::stringstream ss; + + res_status_line(ss)->res_content_type_flv(ss) + ->res_content_length(ss, (int)(flv_size + sh_size + left)); + + if (req->requires_crossdomain()) { + res_enable_crossdomain(ss); + } + + res_header_eof(ss); + + // flush http header to peer + if ((ret = res_flush(skt, ss)) != ERROR_SUCCESS) { + return ret; + } + + if (flv_size > 0 && (ret = skt->write(flv_header, flv_size, NULL)) != ERROR_SUCCESS) { + return ret; + } + if (sh_size > 0 && (ret = skt->write(sh_data, sh_size, NULL)) != ERROR_SUCCESS) { + return ret; + } + + // write body. + char* buf = req->http_ts_send_buffer(); + if ((ret = ffd.lseek(offset)) != ERROR_SUCCESS) { + return ret; + } + + // send data + while (left > 0) { + ssize_t nread = -1; + if ((ret = fs.read(buf, HTTP_TS_SEND_BUFFER_SIZE, &nread)) != ERROR_SUCCESS) { + return ret; + } + + left -= nread; + if ((ret = skt->write(buf, nread, NULL)) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; +} + int SrsHttpVhost::response_ts_file(SrsSocket* skt, SrsHttpMessage* req, string fullpath) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 12a4e1795..439ff987e 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -72,6 +72,7 @@ protected: private: virtual int response_regular_file(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath); virtual int response_flv_file(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath); + virtual int response_flv_file2(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath, int offset); virtual int response_ts_file(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath); virtual std::string get_request_file(SrsHttpMessage* req); public: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index ec5f0e865..944692a70 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -306,6 +306,7 @@ int SrsRtmpConn::stream_service_cycle() case SrsRtmpConnPlay: { srs_verbose("start to play stream %s.", req->stream.c_str()); + // notice edge to start for the first client. if (vhost_is_edge) { if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) { srs_error("notice edge start play stream failed. ret=%d", ret); @@ -313,6 +314,7 @@ int SrsRtmpConn::stream_service_cycle() } } + // response connection start play if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to play stream failed. ret=%d", ret); return ret; diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index a63b79790..a56f84802 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "114" +#define VERSION_REVISION "113" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "SRS" diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 1a1a800fc..e04051e70 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -117,6 +117,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SYSTEM_FILE_EOF 428 #define ERROR_SYSTEM_FILE_RENAME 429 #define ERROR_SYSTEM_CREATE_PIPE 430 +#define ERROR_SYSTEM_FILE_SEEK 431 // see librtmp. // failed when open ssl create the dh @@ -184,6 +185,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_HTTP_OPEN_FILE 805 #define ERROR_HTTP_READ_FILE 806 #define ERROR_HTTP_API_LOGS 807 +#define ERROR_HTTP_FLV_SEQUENCE_HEADER 808 +#define ERROR_HTTP_FLV_OFFSET_OVERFLOW 809 // system control message, // not an error, but special control logic.