1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 03:41:55 +00:00

fix #136, support hls without io(in ram). 2.0.112

This commit is contained in:
winlin 2015-02-03 16:01:07 +08:00
parent 89b37d3469
commit a23191497f
14 changed files with 688 additions and 87 deletions

View file

@ -473,6 +473,19 @@ vhost with-hls.srs.com {
# in a word, the hls_path is for vhost.
# default: ./objs/nginx/html
hls_path ./objs/nginx/html;
# the hls storage: disk, ram or both.
# disk, to write hls m3u8/ts to disk.
# ram, serve m3u8/ts in memory, which use embeded http server to delivery.
# both, disk and ram.
# default: disk
hls_storage disk;
# the hls mount for hls_storage ram,
# which use srs embeded http server to delivery HLS,
# where the mount specifies the HTTP url to mount.
# @see the mount of http_remux.
# @remark the hls_mount must endswith .m3u8.
# default: [vhost]/[app]/[stream].m3u8
hls_mount [vhost]/[app]/[stream].m3u8;
}
}
# the vhost with hls disabled.

View file

@ -1479,7 +1479,9 @@ int SrsConfig::check_config()
} else if (n == "hls") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name.c_str();
if (m != "enabled" && m != "hls_path" && m != "hls_fragment" && m != "hls_window" && m != "hls_on_error") {
if (m != "enabled" && m != "hls_path" && m != "hls_fragment" && m != "hls_window" && m != "hls_on_error"
&& m != "hls_storage" && m != "hls_mount"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost hls directive %s, ret=%d", m.c_str(), ret);
return ret;
@ -1799,6 +1801,7 @@ int SrsConfig::check_config()
}
}
#endif
// TODO: FIXME: required http server when hls storage is ram or both.
}
return ret;
@ -3278,6 +3281,40 @@ string SrsConfig::get_hls_on_error(string vhost)
return conf->arg0();
}
string SrsConfig::get_hls_storage(string vhost)
{
SrsConfDirective* hls = get_hls(vhost);
if (!hls) {
return SRS_CONF_DEFAULT_HLS_STORAGE;
}
SrsConfDirective* conf = hls->get("hls_storage");
if (!conf) {
return SRS_CONF_DEFAULT_HLS_STORAGE;
}
return conf->arg0();
}
string SrsConfig::get_hls_mount(string vhost)
{
SrsConfDirective* hls = get_hls(vhost);
if (!hls) {
return SRS_CONF_DEFAULT_HLS_MOUNT;
}
SrsConfDirective* conf = hls->get("hls_mount");
if (!conf) {
return SRS_CONF_DEFAULT_HLS_MOUNT;
}
return conf->arg0();
}
SrsConfDirective* SrsConfig::get_dvr(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);

View file

@ -52,6 +52,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_HLS_ON_ERROR_DISCONNECT "disconnect"
#define SRS_CONF_DEFAULT_HLS_ON_ERROR_CONTINUE "continue"
#define SRS_CONF_DEFAULT_HLS_ON_ERROR SRS_CONF_DEFAULT_HLS_ON_ERROR_IGNORE
#define SRS_CONF_DEFAULT_HLS_STORAGE "disk"
#define SRS_CONF_DEFAULT_HLS_MOUNT "[vhost]/[app]/[stream].m3u8"
#define SRS_CONF_DEFAULT_DVR_PATH "./objs/nginx/html"
#define SRS_CONF_DEFAULT_DVR_PLAN_SESSION "session"
#define SRS_CONF_DEFAULT_DVR_PLAN_SEGMENT "segment"
@ -906,6 +908,14 @@ public:
* @see https://github.com/winlinvip/simple-rtmp-server/issues/264
*/
virtual std::string get_hls_on_error(std::string vhost);
/**
* get the HLS storage type.
*/
virtual std::string get_hls_storage(std::string vhost);
/**
* get the HLS mount url for HTTP server.
*/
virtual std::string get_hls_mount(std::string vhost);
// dvr section
private:
/**

View file

@ -55,13 +55,87 @@ using namespace std;
// drop the segment when duration of ts too small.
#define SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS 100
SrsHlsSegment::SrsHlsSegment()
ISrsHlsHandler::ISrsHlsHandler()
{
}
ISrsHlsHandler::~ISrsHlsHandler()
{
}
SrsHlsCacheWriter::SrsHlsCacheWriter(bool write_cache, bool write_file)
{
should_write_cache = write_cache;
should_write_file = write_file;
}
SrsHlsCacheWriter::~SrsHlsCacheWriter()
{
}
int SrsHlsCacheWriter::open(string file)
{
if (!should_write_file) {
return ERROR_SUCCESS;
}
return impl.open(file);
}
void SrsHlsCacheWriter::close()
{
if (!should_write_file) {
return;
}
impl.close();
}
bool SrsHlsCacheWriter::is_open()
{
if (!should_write_file) {
return true;
}
return impl.is_open();
}
int64_t SrsHlsCacheWriter::tellg()
{
if (!should_write_file) {
return 0;
}
return impl.tellg();
}
int SrsHlsCacheWriter::write(void* buf, size_t count, ssize_t* pnwrite)
{
if (should_write_cache) {
if (count > 0) {
data.append((char*)buf, count);
}
}
if (should_write_file) {
return impl.write(buf, count, pnwrite);
}
return ERROR_SUCCESS;
}
string SrsHlsCacheWriter::cache()
{
return data;
}
SrsHlsSegment::SrsHlsSegment(bool write_cache, bool write_file)
{
duration = 0;
sequence_no = 0;
segment_start_dts = 0;
is_sequence_header = false;
writer = new SrsFileWriter();
writer = new SrsHlsCacheWriter(write_cache, write_file);
muxer = new SrsTSMuxer(writer);
}
@ -87,12 +161,16 @@ void SrsHlsSegment::update_duration(int64_t current_frame_dts)
return;
}
SrsHlsMuxer::SrsHlsMuxer()
SrsHlsMuxer::SrsHlsMuxer(ISrsHlsHandler* h)
{
req = NULL;
handler = h;
hls_fragment = hls_window = 0;
_sequence_no = 0;
current = NULL;
acodec = SrsCodecAudioReserved1;
should_write_cache = false;
should_write_file = true;
}
SrsHlsMuxer::~SrsHlsMuxer()
@ -105,6 +183,7 @@ SrsHlsMuxer::~SrsHlsMuxer()
segments.clear();
srs_freep(current);
srs_freep(req);
}
int SrsHlsMuxer::sequence_no()
@ -112,17 +191,30 @@ int SrsHlsMuxer::sequence_no()
return _sequence_no;
}
int SrsHlsMuxer::update_config(
string _app, string _stream, string path, int fragment, int window
) {
int SrsHlsMuxer::update_config(SrsRequest* r, string path, int fragment, int window)
{
int ret = ERROR_SUCCESS;
app = _app;
stream = _stream;
srs_freep(req);
req = r->copy();
hls_path = path;
hls_fragment = fragment;
hls_window = window;
std::string storage = _srs_config->get_hls_storage(r->vhost);
if (storage == "ram") {
should_write_cache = true;
should_write_file = false;
} else if (storage == "disk") {
should_write_cache = false;
should_write_file = true;
} else {
srs_assert(storage == "both");
should_write_cache = true;
should_write_file = true;
}
return ret;
}
@ -137,7 +229,7 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
// TODO: create all parents dirs.
// create dir for app.
if ((ret = create_dir()) != ERROR_SUCCESS) {
if (should_write_file && (ret = create_dir()) != ERROR_SUCCESS) {
return ret;
}
@ -145,19 +237,19 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
srs_assert(!current);
// new segment.
current = new SrsHlsSegment();
current = new SrsHlsSegment(should_write_cache, should_write_file);
current->sequence_no = _sequence_no++;
current->segment_start_dts = segment_start_dts;
// generate filename.
char filename[128];
snprintf(filename, sizeof(filename),
"%s-%d.ts", stream.c_str(), current->sequence_no);
"%s-%d.ts", req->stream.c_str(), current->sequence_no);
// TODO: use temp file and rename it.
current->full_path = hls_path;
current->full_path += "/";
current->full_path += app;
current->full_path += req->app;
current->full_path += "/";
current->full_path += filename;
@ -290,6 +382,13 @@ int SrsHlsMuxer::segment_close(string log_desc)
log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration,
current->segment_start_dts);
// notify handler for update ts.
srs_assert(current->writer);
if (handler && (ret = handler->on_update_ts(req, current->uri, current->writer->cache())) != ERROR_SUCCESS) {
srs_error("notify handler for update ts failed. ret=%d", ret);
return ret;
}
// close the muxer of finished segment.
srs_freep(current->muxer);
std::string full_path = current->full_path;
@ -297,7 +396,7 @@ int SrsHlsMuxer::segment_close(string log_desc)
// rename from tmp to real path
std::string tmp_file = full_path + ".tmp";
if (rename(tmp_file.c_str(), full_path.c_str()) < 0) {
if (should_write_file && rename(tmp_file.c_str(), full_path.c_str()) < 0) {
ret = ERROR_HLS_WRITE_FAILED;
srs_error("rename ts file failed, %s => %s. ret=%d",
tmp_file.c_str(), full_path.c_str(), ret);
@ -313,7 +412,9 @@ int SrsHlsMuxer::segment_close(string log_desc)
// rename from tmp to real path
std::string tmp_file = current->full_path + ".tmp";
if (should_write_file) {
unlink(tmp_file.c_str());
}
srs_freep(current);
}
@ -365,22 +466,18 @@ int SrsHlsMuxer::refresh_m3u8()
std::string m3u8_file = hls_path;
m3u8_file += "/";
m3u8_file += app;
m3u8_file += req->app;
m3u8_file += "/";
m3u8_file += stream;
m3u8_file += req->stream;
m3u8_file += ".m3u8";
m3u8 = m3u8_file;
m3u8_file += ".temp";
int fd = -1;
ret = _refresh_m3u8(fd, m3u8_file);
if (fd >= 0) {
close(fd);
if (rename(m3u8_file.c_str(), m3u8.c_str()) < 0) {
if ((ret = _refresh_m3u8(m3u8_file)) == ERROR_SUCCESS) {
if (should_write_file && rename(m3u8_file.c_str(), m3u8.c_str()) < 0) {
ret = ERROR_HLS_WRITE_FAILED;
srs_error("rename m3u8 file failed. "
"%s => %s, ret=%d", m3u8_file.c_str(), m3u8.c_str(), ret);
srs_error("rename m3u8 file failed. %s => %s, ret=%d", m3u8_file.c_str(), m3u8.c_str(), ret);
}
}
@ -390,7 +487,7 @@ int SrsHlsMuxer::refresh_m3u8()
return ret;
}
int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
int SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
{
int ret = ERROR_SUCCESS;
@ -399,10 +496,8 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
return ret;
}
int flags = O_CREAT|O_WRONLY|O_TRUNC;
mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
if ((fd = ::open(m3u8_file.c_str(), flags, mode)) < 0) {
ret = ERROR_HLS_OPEN_FAILED;
SrsHlsCacheWriter writer(should_write_cache, should_write_file);
if ((ret = writer.open(m3u8_file)) != ERROR_SUCCESS) {
srs_error("open m3u8 file %s failed. ret=%d", m3u8_file.c_str(), ret);
return ret;
}
@ -419,8 +514,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
0x23, 0x45, 0x58, 0x54, 0x2d, 0x58, 0x2d, 0x41, 0x4c, 0x4c,
0x4f, 0x57, 0x2d, 0x43, 0x41, 0x43, 0x48, 0x45, 0x3a, 0x4e, 0x4f, 0x0a
};
if (::write(fd, header, sizeof(header)) != sizeof(header)) {
ret = ERROR_HLS_WRITE_FAILED;
if ((ret = writer.write(header, sizeof(header), NULL)) != ERROR_SUCCESS) {
srs_error("write m3u8 header failed. ret=%d", ret);
return ret;
}
@ -430,8 +524,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
SrsHlsSegment* first = *segments.begin();
char sequence[34] = {};
int len = snprintf(sequence, sizeof(sequence), "#EXT-X-MEDIA-SEQUENCE:%d\n", first->sequence_no);
if (::write(fd, sequence, len) != len) {
ret = ERROR_HLS_WRITE_FAILED;
if ((ret = writer.write(sequence, len, NULL)) != ERROR_SUCCESS) {
srs_error("write m3u8 sequence failed. ret=%d", ret);
return ret;
}
@ -448,8 +541,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
target_duration += 1;
char duration[34]; // 23+10+1
len = snprintf(duration, sizeof(duration), "#EXT-X-TARGETDURATION:%d\n", target_duration);
if (::write(fd, duration, len) != len) {
ret = ERROR_HLS_WRITE_FAILED;
if ((ret = writer.write(duration, len, NULL)) != ERROR_SUCCESS) {
srs_error("write m3u8 duration failed. ret=%d", ret);
return ret;
}
@ -463,8 +555,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
// #EXT-X-DISCONTINUITY\n
char ext_discon[22]; // 21+1
len = snprintf(ext_discon, sizeof(ext_discon), "#EXT-X-DISCONTINUITY\n");
if (::write(fd, ext_discon, len) != len) {
ret = ERROR_HLS_WRITE_FAILED;
if ((ret = writer.write(ext_discon, len, NULL)) != ERROR_SUCCESS) {
srs_error("write m3u8 segment discontinuity failed. ret=%d", ret);
return ret;
}
@ -474,8 +565,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
// "#EXTINF:4294967295.208,\n"
char ext_info[25]; // 14+10+1
len = snprintf(ext_info, sizeof(ext_info), "#EXTINF:%.3f\n", segment->duration);
if (::write(fd, ext_info, len) != len) {
ret = ERROR_HLS_WRITE_FAILED;
if ((ret = writer.write(ext_info, len, NULL)) != ERROR_SUCCESS) {
srs_error("write m3u8 segment info failed. ret=%d", ret);
return ret;
}
@ -484,8 +574,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
// file name
std::string filename = segment->uri;
filename += "\n";
if (::write(fd, filename.c_str(), filename.length()) != (int)filename.length()) {
ret = ERROR_HLS_WRITE_FAILED;
if ((ret = writer.write((char*)filename.c_str(), (int)filename.length(), NULL)) != ERROR_SUCCESS) {
srs_error("write m3u8 segment uri failed. ret=%d", ret);
return ret;
}
@ -493,6 +582,12 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file)
}
srs_info("write m3u8 %s success.", m3u8_file.c_str());
// notify handler for update m3u8.
if (handler && (ret = handler->on_update_m3u8(req, writer.cache())) != ERROR_SUCCESS) {
srs_error("notify handler for update m3u8 failed. ret=%d", ret);
return ret;
}
return ret;
}
@ -500,9 +595,13 @@ int SrsHlsMuxer::create_dir()
{
int ret = ERROR_SUCCESS;
if (!should_write_file) {
return ret;
}
std::string app_dir = hls_path;
app_dir += "/";
app_dir += app;
app_dir += req->app;
// TODO: cleanup the dir when startup.
@ -543,7 +642,7 @@ int SrsHlsCache::on_publish(SrsHlsMuxer* muxer, SrsRequest* req, int64_t segment
// for the HLS donot requires the EXT-X-MEDIA-SEQUENCE be monotonically increase.
// open muxer
if ((ret = muxer->update_config(app, stream, hls_path, hls_fragment, hls_window)) != ERROR_SUCCESS) {
if ((ret = muxer->update_config(req, hls_path, hls_fragment, hls_window)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer update config failed. ret=%d", ret);
return ret;
}
@ -679,16 +778,18 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme
return ret;
}
SrsHls::SrsHls(SrsSource* _source)
SrsHls::SrsHls(SrsSource* s, ISrsHlsHandler* h)
{
source = s;
handler = h;
hls_enabled = false;
source = _source;
codec = new SrsAvcAacCodec();
sample = new SrsCodecSample();
jitter = new SrsRtmpJitter();
muxer = new SrsHlsMuxer();
muxer = new SrsHlsMuxer(h);
hls_cache = new SrsHlsCache();
pithy_print = new SrsPithyPrint(SRS_CONSTS_STAGE_HLS);

View file

@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <vector>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_file.hpp>
class SrsSharedPtrMessage;
class SrsCodecSample;
@ -53,6 +54,70 @@ class SrsFileWriter;
class SrsSimpleBuffer;
class SrsTsAacJitter;
class SrsTsCache;
class SrsHlsSegment;
/**
* the handler for hls event.
* for example, we use memory only hls for
*/
class ISrsHlsHandler
{
public:
ISrsHlsHandler();
virtual ~ISrsHlsHandler();
public:
/**
* when publish stream
*/
virtual int on_hls_publish(SrsRequest* req) = 0;
/**
* when update the m3u8 file.
*/
virtual int on_update_m3u8(SrsRequest* r, std::string m3u8) = 0;
/**
* when reap new ts file.
*/
virtual int on_update_ts(SrsRequest* r, std::string uri, std::string ts) = 0;
/**
* when unpublish stream
*/
virtual int on_hls_unpublish(SrsRequest* req) = 0;
};
/**
* write to file and cache.
*/
class SrsHlsCacheWriter : public SrsFileWriter
{
private:
SrsFileWriter impl;
std::string data;
bool should_write_cache;
bool should_write_file;
public:
SrsHlsCacheWriter(bool write_cache, bool write_file);
virtual ~SrsHlsCacheWriter();
public:
/**
* open file writer, can open then close then open...
*/
virtual int open(std::string file);
virtual void close();
public:
virtual bool is_open();
virtual int64_t tellg();
public:
/**
* write to file.
* @param pnwrite the output nb_write, NULL to ignore.
*/
virtual int write(void* buf, size_t count, ssize_t* pnwrite);
public:
/**
* get the string cache.
*/
virtual std::string cache();
};
/**
* the wrapper of m3u8 segment from specification:
@ -72,16 +137,16 @@ public:
// ts full file to write.
std::string full_path;
// the muxer to write ts.
SrsFileWriter* writer;
SrsHlsCacheWriter* writer;
SrsTSMuxer* muxer;
// current segment start dts for m3u8
int64_t segment_start_dts;
// whether current segement is sequence header.
bool is_sequence_header;
SrsHlsSegment();
public:
SrsHlsSegment(bool write_cache, bool write_file);
virtual ~SrsHlsSegment();
public:
/**
* update the segment duration.
* @current_frame_dts the dts of frame, in tbn of ts.
@ -100,8 +165,7 @@ public:
class SrsHlsMuxer
{
private:
std::string app;
std::string stream;
SrsRequest* req;
private:
std::string hls_path;
int hls_fragment;
@ -109,6 +173,10 @@ private:
private:
int _sequence_no;
std::string m3u8;
private:
ISrsHlsHandler* handler;
bool should_write_cache;
bool should_write_file;
private:
/**
* m3u8 segments.
@ -125,12 +193,15 @@ private:
*/
SrsCodecAudio acodec;
public:
SrsHlsMuxer();
SrsHlsMuxer(ISrsHlsHandler* h);
virtual ~SrsHlsMuxer();
public:
virtual int sequence_no();
public:
virtual int update_config(std::string _app, std::string _stream, std::string path, int fragment, int window);
/**
* when publish, update the config for muxer.
*/
virtual int update_config(SrsRequest* r, std::string path, int fragment, int window);
/**
* open a new segment(a new ts file),
* @param segment_start_dts use to calc the segment duration,
@ -160,7 +231,7 @@ public:
virtual int segment_close(std::string log_desc);
private:
virtual int refresh_m3u8();
virtual int _refresh_m3u8(int& fd, std::string m3u8_file);
virtual int _refresh_m3u8(std::string m3u8_file);
virtual int create_dir();
};
@ -229,6 +300,7 @@ class SrsHls
private:
SrsHlsMuxer* muxer;
SrsHlsCache* hls_cache;
ISrsHlsHandler* handler;
private:
bool hls_enabled;
SrsSource* source;
@ -251,7 +323,7 @@ private:
*/
int64_t stream_dts;
public:
SrsHls(SrsSource* _source);
SrsHls(SrsSource* s, ISrsHlsHandler* h);
virtual ~SrsHls();
public:
/**

View file

@ -379,7 +379,9 @@ int SrsGoHttpFileServer::serve_file(ISrsGoHttpResponseWriter* w, SrsHttpMessage*
// write body.
int64_t left = length;
if ((ret = copy(w, &fs, r, left)) != ERROR_SUCCESS) {
srs_warn("read file=%s size=%d failed, ret=%d", fullpath.c_str(), left, ret);
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read file=%s size=%d failed, ret=%d", fullpath.c_str(), left, ret);
}
return ret;
}

View file

@ -680,12 +680,81 @@ SrsLiveEntry::SrsLiveEntry()
cache = NULL;
}
SrsHlsM3u8Stream::SrsHlsM3u8Stream()
{
}
SrsHlsM3u8Stream::~SrsHlsM3u8Stream()
{
}
void SrsHlsM3u8Stream::set_m3u8(std::string v)
{
m3u8 = v;
}
int SrsHlsM3u8Stream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
{
int ret = ERROR_SUCCESS;
std::string data = m3u8;
w->header()->set_content_length((int)data.length());
w->header()->set_content_type("application/x-mpegURL;charset=utf-8");
if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send m3u8 failed. ret=%d", ret);
}
return ret;
}
return ret;
}
SrsHlsTsStream::SrsHlsTsStream()
{
}
SrsHlsTsStream::~SrsHlsTsStream()
{
}
void SrsHlsTsStream::set_ts(std::string v)
{
ts = v;
}
int SrsHlsTsStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
{
int ret = ERROR_SUCCESS;
std::string data = ts;
w->header()->set_content_length((int)data.length());
w->header()->set_content_type("video/MP2T");
if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send ts failed. ret=%d", ret);
}
return ret;
}
return ret;
}
SrsHlsEntry::SrsHlsEntry()
{
}
SrsHttpServer::SrsHttpServer()
{
}
SrsHttpServer::~SrsHttpServer()
{
if (true) {
std::map<std::string, SrsLiveEntry*>::iterator it;
for (it = flvs.begin(); it != flvs.end(); ++it) {
SrsLiveEntry* entry = it->second;
@ -693,6 +762,15 @@ SrsHttpServer::~SrsHttpServer()
}
flvs.clear();
}
if (true) {
std::map<std::string, SrsHlsEntry*>::iterator it;
for (it = hls.begin(); it != hls.end(); ++it) {
SrsHlsEntry* entry = it->second;
srs_freep(entry);
}
hls.clear();
}
}
int SrsHttpServer::initialize()
{
@ -700,12 +778,17 @@ int SrsHttpServer::initialize()
// static file
// flv vod streaming.
if ((ret = mount_static_file()) != ERROR_SUCCESS) {
if ((ret = initialize_static_file()) != ERROR_SUCCESS) {
return ret;
}
// remux rtmp to flv live streaming
if ((ret = mount_flv_streaming()) != ERROR_SUCCESS) {
if ((ret = initialize_flv_streaming()) != ERROR_SUCCESS) {
return ret;
}
// remux rtmp to hls live streaming
if ((ret = initialize_hls_streaming()) != ERROR_SUCCESS) {
return ret;
}
@ -769,6 +852,128 @@ void SrsHttpServer::unmount(SrsSource* s, SrsRequest* r)
entry->stream->entry->enabled = false;
}
int SrsHttpServer::mount_hls(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
if (hls.find(r->vhost) == hls.end()) {
srs_info("ignore mount hls stream for disabled");
return ret;
}
SrsHlsEntry* entry = hls[r->vhost];
// TODO: FIXME: supports reload.
std::map<std::string, ISrsGoHttpHandler*>::iterator it;
for (it = entry->streams.begin(); it != entry->streams.end(); ++it) {
ISrsGoHttpHandler* stream = it->second;
stream->entry->enabled = true;
}
return ret;
}
int SrsHttpServer::hls_update_m3u8(SrsRequest* r, string m3u8)
{
int ret = ERROR_SUCCESS;
srs_assert(hls.find(r->vhost) != hls.end());
SrsHlsEntry* entry = hls[r->vhost];
srs_assert(entry);
std::string mount = entry->mount;
// replace the vhost variable
mount = srs_string_replace(mount, "[vhost]", r->vhost);
mount = srs_string_replace(mount, "[app]", r->app);
mount = srs_string_replace(mount, "[stream]", r->stream);
// remove the default vhost mount
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
if (entry->streams.find(mount) == entry->streams.end()) {
ISrsGoHttpHandler* he = new SrsHlsM3u8Stream();
entry->streams[mount] = he;
if ((ret = mux.handle(mount, he)) != ERROR_SUCCESS) {
srs_error("handle mount=%s failed. ret=%d", mount.c_str(), ret);
return ret;
}
}
// update the m3u8 stream.
SrsHlsM3u8Stream* hms = dynamic_cast<SrsHlsM3u8Stream*>(entry->streams[mount]);
if (hms) {
hms->set_m3u8(m3u8);
}
srs_trace("hls update m3u8 ok, mount=%s", mount.c_str());
return ret;
}
int SrsHttpServer::hls_update_ts(SrsRequest* r, string uri, string ts)
{
int ret = ERROR_SUCCESS;
srs_assert(hls.find(r->vhost) != hls.end());
SrsHlsEntry* entry = hls[r->vhost];
srs_assert(entry);
std::string mount = entry->mount;
// the ts is relative from the m3u8, the same start dir.
size_t pos = string::npos;
if ((pos = mount.rfind("/")) != string::npos) {
mount = mount.substr(0, pos);
}
// replace the vhost variable
mount = srs_string_replace(mount, "[vhost]", r->vhost);
mount = srs_string_replace(mount, "[app]", r->app);
// remove the default vhost mount
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
// mount with ts.
mount += "/";
mount += uri;
if (entry->streams.find(mount) == entry->streams.end()) {
ISrsGoHttpHandler* he = new SrsHlsTsStream();
entry->streams[mount] = he;
if ((ret = mux.handle(mount, he)) != ERROR_SUCCESS) {
srs_error("handle mount=%s failed. ret=%d", mount.c_str(), ret);
return ret;
}
}
// update the ts stream.
SrsHlsTsStream* hts = dynamic_cast<SrsHlsTsStream*>(entry->streams[mount]);
if (hts) {
hts->set_ts(ts);
}
srs_trace("hls update ts ok, mount=%s", mount.c_str());
return ret;
}
void SrsHttpServer::unmount_hls(SrsRequest* r)
{
if (hls.find(r->vhost) == hls.end()) {
srs_info("ignore unmount hls stream for disabled");
return;
}
SrsHlsEntry* entry = hls[r->vhost];
std::map<std::string, ISrsGoHttpHandler*>::iterator it;
for (it = entry->streams.begin(); it != entry->streams.end(); ++it) {
ISrsGoHttpHandler* stream = it->second;
stream->entry->enabled = false;
}
}
int SrsHttpServer::on_reload_vhost_http_updated()
{
int ret = ERROR_SUCCESS;
@ -783,7 +988,14 @@ int SrsHttpServer::on_reload_vhost_http_remux_updated()
return ret;
}
int SrsHttpServer::mount_static_file()
int SrsHttpServer::on_reload_vhost_hls(string vhost)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
int SrsHttpServer::initialize_static_file()
{
int ret = ERROR_SUCCESS;
@ -843,7 +1055,7 @@ int SrsHttpServer::mount_static_file()
return ret;
}
int SrsHttpServer::mount_flv_streaming()
int SrsHttpServer::initialize_flv_streaming()
{
int ret = ERROR_SUCCESS;
@ -872,6 +1084,40 @@ int SrsHttpServer::mount_flv_streaming()
return ret;
}
int SrsHttpServer::initialize_hls_streaming()
{
int ret = ERROR_SUCCESS;
// http hls live stream mount for each vhost.
SrsConfDirective* root = _srs_config->get_root();
for (int i = 0; i < (int)root->directives.size(); i++) {
SrsConfDirective* conf = root->at(i);
if (!conf->is_vhost()) {
continue;
}
std::string vhost = conf->arg0();
if (!_srs_config->get_hls_enabled(vhost)) {
continue;
}
std::string storage = _srs_config->get_hls_storage(vhost);
if (storage != "ram" && storage != "both") {
continue;
}
SrsHlsEntry* entry = new SrsHlsEntry();
entry->vhost = vhost;
entry->mount = _srs_config->get_hls_mount(vhost);
hls[vhost] = entry;
srs_trace("http hls live stream, vhost=%s, mount=%s",
vhost.c_str(), entry->mount.c_str());
}
return ret;
}
SrsHttpConn::SrsHttpConn(SrsServer* svr, st_netfd_t fd, SrsHttpServer* m)
: SrsConnection(svr, fd)
{

View file

@ -260,6 +260,53 @@ struct SrsLiveEntry
SrsLiveEntry();
};
/**
* the m3u8 stream handler.
*/
class SrsHlsM3u8Stream : public ISrsGoHttpHandler
{
private:
std::string m3u8;
public:
SrsHlsM3u8Stream();
virtual ~SrsHlsM3u8Stream();
public:
virtual void set_m3u8(std::string v);
public:
virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r);
};
/**
* the ts stream handler.
*/
class SrsHlsTsStream : public ISrsGoHttpHandler
{
private:
std::string ts;
public:
SrsHlsTsStream();
virtual ~SrsHlsTsStream();
public:
virtual void set_ts(std::string v);
public:
virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r);
};
/**
* the srs hls entry.
*/
struct SrsHlsEntry
{
std::string vhost;
std::string mount;
// key: the m3u8/ts file path.
// value: the http handler.
std::map<std::string, ISrsGoHttpHandler*> streams;
SrsHlsEntry();
};
/**
* the http server instance,
* serve http static file, flv vod stream and flv live stream.
@ -270,21 +317,32 @@ public:
SrsGoHttpServeMux mux;
// the flv live streaming template.
std::map<std::string, SrsLiveEntry*> flvs;
// the hls live streaming template.
std::map<std::string, SrsHlsEntry*> hls;
public:
SrsHttpServer();
virtual ~SrsHttpServer();
public:
virtual int initialize();
// http flv/ts/mp3/aac stream
public:
virtual int mount(SrsSource* s, SrsRequest* r);
virtual void unmount(SrsSource* s, SrsRequest* r);
// hls stream
public:
virtual int mount_hls(SrsRequest* r);
virtual int hls_update_m3u8(SrsRequest* r, std::string m3u8);
virtual int hls_update_ts(SrsRequest* r, std::string uri, std::string ts);
virtual void unmount_hls(SrsRequest* r);
// interface ISrsThreadHandler.
public:
virtual int on_reload_vhost_http_updated();
virtual int on_reload_vhost_http_remux_updated();
virtual int on_reload_vhost_hls(std::string vhost);
private:
virtual int mount_static_file();
virtual int mount_flv_streaming();
virtual int initialize_static_file();
virtual int initialize_flv_streaming();
virtual int initialize_hls_streaming();
};
class SrsHttpConn : public SrsConnection

View file

@ -393,7 +393,7 @@ int SrsRtmpConn::stream_service_cycle()
// find a source to serve.
SrsSource* source = NULL;
if ((ret = SrsSource::find(req, server, &source)) != ERROR_SUCCESS) {
if ((ret = SrsSource::find(req, server, server, &source)) != ERROR_SUCCESS) {
return ret;
}
srs_assert(source != NULL);

View file

@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <fcntl.h>
#include <algorithm>
using namespace std;
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
@ -1277,3 +1278,53 @@ void SrsServer::on_unpublish(SrsSource* s, SrsRequest* r)
#endif
}
int SrsServer::on_hls_publish(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_SERVER
if ((ret = http_stream_mux->mount_hls(r)) != ERROR_SUCCESS) {
return ret;
}
#endif
return ret;
}
int SrsServer::on_update_m3u8(SrsRequest* r, string m3u8)
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_SERVER
if ((ret = http_stream_mux->hls_update_m3u8(r, m3u8)) != ERROR_SUCCESS) {
return ret;
}
#endif
return ret;
}
int SrsServer::on_update_ts(SrsRequest* r, string uri, string ts)
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_SERVER
if ((ret = http_stream_mux->hls_update_ts(r, uri, ts)) != ERROR_SUCCESS) {
return ret;
}
#endif
return ret;
}
int SrsServer::on_hls_unpublish(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_SERVER
http_stream_mux->unmount_hls(r);
#endif
return ret;
}

View file

@ -31,11 +31,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <vector>
#include <string>
#include <srs_app_st.hpp>
#include <srs_app_reload.hpp>
#include <srs_app_thread.hpp>
#include <srs_app_source.hpp>
#include <srs_app_hls.hpp>
class SrsServer;
class SrsConnection;
@ -142,7 +144,7 @@ private:
* start connection service thread, destroy client.
*/
class SrsServer : virtual public ISrsReloadHandler
, virtual public ISrsSourceHandler
, virtual public ISrsSourceHandler, virtual public ISrsHlsHandler
{
private:
#ifdef SRS_AUTO_HTTP_API
@ -275,6 +277,12 @@ public:
public:
virtual int on_publish(SrsSource* s, SrsRequest* r);
virtual void on_unpublish(SrsSource* s, SrsRequest* r);
// interface ISrsHlsHandler
public:
virtual int on_hls_publish(SrsRequest* r);
virtual int on_update_m3u8(SrsRequest* r, std::string m3u8);
virtual int on_update_ts(SrsRequest* r, std::string uri, std::string ts);
virtual int on_hls_unpublish(SrsRequest* r);
};
#endif

View file

@ -713,7 +713,7 @@ ISrsSourceHandler::~ISrsSourceHandler()
std::map<std::string, SrsSource*> SrsSource::pool;
int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps)
{
int ret = ERROR_SUCCESS;
@ -721,7 +721,7 @@ int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
string vhost = r->vhost;
if (pool.find(stream_url) == pool.end()) {
SrsSource* source = new SrsSource();
SrsSource* source = new SrsSource(hh);
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
@ -754,13 +754,14 @@ void SrsSource::destroy()
pool.clear();
}
SrsSource::SrsSource()
SrsSource::SrsSource(ISrsHlsHandler* hh)
{
_req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
#ifdef SRS_AUTO_HLS
hls = new SrsHls(this);
// TODO: FIXME: refine code, use subscriber pattern.
hls = new SrsHls(this, hh);
#endif
#ifdef SRS_AUTO_DVR
dvr = new SrsDvr(this);

View file

@ -61,6 +61,7 @@ class SrsDvr;
class SrsEncoder;
#endif
class SrsStream;
class ISrsHlsHandler;
/**
* the time jitter algorithm:
@ -376,9 +377,10 @@ public:
* find stream by vhost/app/stream.
* @param r the client request.
* @param h the event handler for source.
* @param hh the event handler for hls.
* @param pps the matched source, if success never be NULL.
*/
static int find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
static int find(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps);
/**
* when system exit, destroy the sources,
* for gmc to analysis mem leaks.
@ -451,7 +453,7 @@ public:
* @param _req the client request object,
* this object will deep copy it for reload.
*/
SrsSource();
SrsSource(ISrsHlsHandler* hh);
virtual ~SrsSource();
// initialize, get and setter.
public:

View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 111
#define VERSION_REVISION 112
// server info.
#define RTMP_SIG_SRS_KEY "SRS"