diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 96854a59a..f28d63904 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -36,6 +36,13 @@ using namespace std; #include #include #include +#include +#include +#include +#include +#include +#include +#include #define SRS_HTTP_FLV_STREAM_BUFFER 4096 @@ -84,32 +91,47 @@ void SrsAppCasterFlv::remove(SrsConnection* c) int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) { - int ret = ERROR_SUCCESS; + SrsDynamicHttpConn* conn = dynamic_cast(r->connection()); + srs_assert(conn); - srs_info("flv: handle request at %s", r->path().c_str()); + std::string app = srs_path_dirname(r->path()); + app = srs_string_trim_start(app, "/"); - char* buffer = new char[SRS_HTTP_FLV_STREAM_BUFFER]; - SrsAutoFree(char, buffer); + std::string stream = srs_path_basename(r->path()); + stream = srs_string_trim_start(stream, "/"); - ISrsHttpResponseReader* rr = r->body_reader(); - while (!rr->eof()) { - int nb_read = 0; - if ((ret = rr->read(buffer, SRS_HTTP_FLV_STREAM_BUFFER, &nb_read)) != ERROR_SUCCESS) { - return ret; - } - //srs_trace("flv: read %dB from %s", nb_read, r->path().c_str()); + std::string o = output; + if (!app.empty() && app != "/") { + o = srs_string_replace(o, "[app]", app); + } + o = srs_string_replace(o, "[stream]", stream); + + // remove the extension. + if (srs_string_ends_with(o, ".flv")) { + o = o.substr(0, o.length() - 4); } - return ret; + return conn->proxy(w, r, o); } SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) : SrsHttpConn(cm, fd, m) { + + req = NULL; + io = NULL; + client = NULL; + stfd = NULL; + stream_id = 0; + + pprint = SrsPithyPrint::create_caster(); } SrsDynamicHttpConn::~SrsDynamicHttpConn() { + close(); + + srs_freep(pprint); } int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg) @@ -118,4 +140,300 @@ int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg) return ret; } +int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, SrsHttpMessage* r, std::string o) +{ + int ret = ERROR_SUCCESS; + + output = o; + srs_trace("flv: proxy %s to %s", r->uri().c_str(), output.c_str()); + + char* buffer = new char[SRS_HTTP_FLV_STREAM_BUFFER]; + SrsAutoFree(char, buffer); + + ISrsHttpResponseReader* rr = r->body_reader(); + SrsHttpFileReader reader(rr); + SrsFlvDecoder dec; + + if ((ret = dec.initialize(&reader)) != ERROR_SUCCESS) { + return ret; + } + + char header[9]; + if ((ret = dec.read_header(header)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("flv: proxy flv header failed. ret=%d", ret); + } + return ret; + } + srs_trace("flv: proxy drop flv header."); + + char pps[4]; + if ((ret = dec.read_previous_tag_size(pps)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("flv: proxy flv header pps failed. ret=%d", ret); + } + return ret; + } + + while (!rr->eof()) { + pprint->elapse(); + + if ((ret = connect()) != ERROR_SUCCESS) { + return ret; + } + + char type; + int32_t size; + u_int32_t time; + if ((ret = dec.read_tag_header(&type, &size, &time)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("flv: proxy tag header failed. ret=%d", ret); + } + return ret; + } + + char* data = new char[size]; + if ((ret = dec.read_tag_data(data, size)) != ERROR_SUCCESS) { + srs_freep(data); + if (!srs_is_client_gracefully_close(ret)) { + srs_error("flv: proxy tag data failed. ret=%d", ret); + } + return ret; + } + + if ((ret = rtmp_write_packet(type, time, data, size)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("flv: proxy rtmp packet failed. ret=%d", ret); + } + return ret; + } + + if ((ret = dec.read_previous_tag_size(pps)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("flv: proxy tag header pps failed. ret=%d", ret); + } + return ret; + } + } + + return ret; +} + +int SrsDynamicHttpConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* msg = NULL; + + if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) { + srs_error("flv: create shared ptr msg failed. ret=%d", ret); + return ret; + } + srs_assert(msg); + + if (pprint->can_print()) { + srs_trace("flv: send msg %s age=%d, dts=%"PRId64", size=%d", + msg->is_audio()? "A":msg->is_video()? "V":"N", pprint->age(), msg->timestamp, msg->size); + } + + // send out encoded msg. + if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsDynamicHttpConn::connect() +{ + int ret = ERROR_SUCCESS; + + // when ok, ignore. + // TODO: FIXME: should reconnect when disconnected. + if (io || client) { + return ret; + } + + // parse uri + if (!req) { + req = new SrsRequest(); + + size_t pos = string::npos; + string uri = req->tcUrl = output; + + // tcUrl, stream + if ((pos = uri.rfind("/")) != string::npos) { + req->stream = uri.substr(pos + 1); + req->tcUrl = uri = uri.substr(0, pos); + } + + srs_discovery_tc_url(req->tcUrl, + req->schema, req->host, req->vhost, req->app, req->port, + req->param); + } + + // connect host. + if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { + srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret); + return ret; + } + io = new SrsStSocket(stfd); + client = new SrsRtmpClient(io); + + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); + + // connect to vhost/app + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("mpegts: handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + return ret; + } + + // publish. + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) { + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d", + req->stream.c_str(), stream_id, ret); + return ret; + } + + return ret; +} + +// TODO: FIXME: refine the connect_app. +int SrsDynamicHttpConn::connect_app(string ep_server, string ep_port) +{ + int ret = ERROR_SUCCESS; + + // args of request takes the srs info. + if (req->args == NULL) { + req->args = SrsAmf0Any::object(); + } + + // notify server the edge identity, + // @see https://github.com/simple-rtmp-server/srs/issues/147 + SrsAmf0Object* data = req->args; + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); + // for edge to directly get the id of client. + data->set("srs_pid", SrsAmf0Any::number(getpid())); + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); + + // local ip of edge + std::vector ips = srs_get_local_ipv4_ips(); + assert(_srs_config->get_stats_network() < (int)ips.size()); + std::string local_ip = ips[_srs_config->get_stats_network()]; + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); + + // generate the tcUrl + std::string param = ""; + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param); + + // upnode server identity will show in the connect_app of client. + // @see https://github.com/simple-rtmp-server/srs/issues/160 + // the debug_srs_upnode is config in vhost and default to true. + bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d", + tc_url.c_str(), debug_srs_upnode, ret); + return ret; + } + + return ret; +} + +void SrsDynamicHttpConn::close() +{ + srs_freep(client); + srs_freep(io); + srs_freep(req); + srs_close_stfd(stfd); +} + +SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) +{ + http = h; +} + +SrsHttpFileReader::~SrsHttpFileReader() +{ +} + +int SrsHttpFileReader::open(std::string /*file*/) +{ + return ERROR_SUCCESS; +} + +void SrsHttpFileReader::close() +{ +} + +bool SrsHttpFileReader::is_open() +{ + return true; +} + +int64_t SrsHttpFileReader::tellg() +{ + return 0; +} + +void SrsHttpFileReader::skip(int64_t /*size*/) +{ +} + +int64_t SrsHttpFileReader::lseek(int64_t offset) +{ + return offset; +} + +int64_t SrsHttpFileReader::filesize() +{ + return 0; +} + +int SrsHttpFileReader::read(void* buf, size_t count, ssize_t* pnread) +{ + int ret = ERROR_SUCCESS; + + if (http->eof()) { + ret = ERROR_HTTP_REQUEST_EOF; + srs_error("flv: encoder EOF. ret=%d", ret); + return ret; + } + + int total_read = 0; + while (total_read < count) { + int nread = 0; + if ((ret = http->read((char*)buf + total_read, (int)(count - total_read), &nread)) != ERROR_SUCCESS) { + return ret; + } + + srs_assert(nread); + total_read += nread; + } + + if (pnread) { + *pnread = total_read; + } + + return ret; +} + #endif diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index fc4edd83b..ac8987fd2 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -38,13 +38,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsConfDirective; class SrsHttpServeMux; class SrsHttpConn; +class SrsRtmpClient; +class SrsStSocket; +class SrsRequest; +class SrsPithyPrint; #include #include #include #include #include +#include +/** + * the stream caster for flv stream over HTTP POST. + */ class SrsAppCasterFlv : virtual public ISrsTcpHandler , virtual public IConnectionManager, virtual public ISrsHttpHandler { @@ -68,13 +76,68 @@ public: virtual int serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r); }; +/** + * the dynamic http connection, never drop the body. + */ class SrsDynamicHttpConn : public SrsHttpConn { +private: + std::string output; + SrsPithyPrint* pprint; +private: + SrsRequest* req; + st_netfd_t stfd; + SrsStSocket* io; + SrsRtmpClient* client; + int stream_id; public: SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); virtual ~SrsDynamicHttpConn(); public: virtual int on_got_http_message(SrsHttpMessage* msg); +public: + virtual int proxy(ISrsHttpResponseWriter* w, SrsHttpMessage* r, std::string o); +private: + virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); +private: + // connect to rtmp output url. + // @remark ignore when not connected, reconnect when disconnected. + virtual int connect(); + virtual int connect_app(std::string ep_server, std::string ep_port); + // close the connected io and rtmp to ready to be re-connect. + virtual void close(); +}; + +/** + * the http wrapper for file reader, + * to read http post stream like a file. + */ +class SrsHttpFileReader : public SrsFileReader +{ +private: + ISrsHttpResponseReader* http; +public: + SrsHttpFileReader(ISrsHttpResponseReader* h); + virtual ~SrsHttpFileReader(); +public: + /** + * open file reader, can open then close then open... + */ + virtual int open(std::string file); + virtual void close(); +public: + // TODO: FIXME: extract interface. + virtual bool is_open(); + virtual int64_t tellg(); + virtual void skip(int64_t size); + virtual int64_t lseek(int64_t offset); + virtual int64_t filesize(); +public: + /** + * read from file. + * @param pnread the output nb_read, NULL to ignore. + */ + virtual int read(void* buf, size_t count, ssize_t* pnread); }; #endif diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index 27979de4f..4ed1453f2 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -1078,8 +1078,9 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) return ret; } -SrsHttpMessage::SrsHttpMessage(SrsStSocket* io) +SrsHttpMessage::SrsHttpMessage(SrsStSocket* io, SrsConnection* c) { + conn = c; chunked = false; _uri = new SrsHttpUri(); _body = new SrsHttpResponseReader(this, io); @@ -1165,6 +1166,11 @@ char* SrsHttpMessage::http_ts_send_buffer() return _http_ts_send_buffer; } +SrsConnection* SrsHttpMessage::connection() +{ + return conn; +} + u_int8_t SrsHttpMessage::method() { return (u_int8_t)_header.method; @@ -1230,8 +1236,9 @@ string SrsHttpMessage::uri() { std::string uri = _uri->get_schema(); if (uri.empty()) { - uri += "http://"; + uri += "http"; } + uri += "://"; uri += host(); uri += path(); @@ -1393,7 +1400,7 @@ int SrsHttpParser::initialize(enum http_parser_type type) return ret; } -int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) +int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, SrsHttpMessage** ppmsg) { *ppmsg = NULL; @@ -1418,7 +1425,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) } // create msg - SrsHttpMessage* msg = new SrsHttpMessage(skt); + SrsHttpMessage* msg = new SrsHttpMessage(skt, conn); // initalize http msg, parse url. if ((ret = msg->update(url, &header, buffer, headers)) != ERROR_SUCCESS) { diff --git a/trunk/src/app/srs_app_http.hpp b/trunk/src/app/srs_app_http.hpp index 90344751b..190999b50 100644 --- a/trunk/src/app/srs_app_http.hpp +++ b/trunk/src/app/srs_app_http.hpp @@ -50,6 +50,7 @@ class SrsSimpleBuffer; class SrsHttpMuxEntry; class ISrsHttpResponseWriter; class SrsFastBuffer; +class SrsConnection; // http specification // CR = @@ -506,8 +507,10 @@ private: std::vector _headers; // the query map std::map _query; + // the transport connection, can be NULL. + SrsConnection* conn; public: - SrsHttpMessage(SrsStSocket* io); + SrsHttpMessage(SrsStSocket* io, SrsConnection* c); virtual ~SrsHttpMessage(); public: /** @@ -518,6 +521,7 @@ public: ); public: virtual char* http_ts_send_buffer(); + virtual SrsConnection* connection(); public: virtual u_int8_t method(); virtual u_int16_t status_code(); @@ -617,7 +621,7 @@ public: * or error and *ppmsg must be NULL. * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete(). */ - virtual int parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg); + virtual int parse_message(SrsStSocket* skt, SrsConnection* conn, SrsHttpMessage** ppmsg); private: /** * parse the HTTP message to member field: msg. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 0f3aac193..d4e3ea5fc 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -528,7 +528,7 @@ int SrsHttpApi::do_cycle() SrsHttpMessage* req = NULL; // get a http message - if ((ret = parser->parse_message(&skt, &req)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index a36ef69e1..377f9a79b 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -105,7 +105,7 @@ int SrsHttpClient::post(string path, string req, SrsHttpMessage** ppmsg) } SrsHttpMessage* msg = NULL; - if ((ret = parser->parse_message(skt, &msg)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) { srs_error("parse http post response failed. ret=%d", ret); return ret; } @@ -151,7 +151,7 @@ int SrsHttpClient::get(string path, std::string req, SrsHttpMessage** ppmsg) } SrsHttpMessage* msg = NULL; - if ((ret = parser->parse_message(skt, &msg)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) { srs_error("parse http post response failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 21eb95511..bdf8e2dc0 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -1388,7 +1388,7 @@ int SrsHttpConn::do_cycle() SrsHttpMessage* req = NULL; // get a http message - if ((ret = parser->parse_message(&skt, &req)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index bb2cf05d8..bfcdad6b6 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -255,6 +255,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_HTTP_INVALID_CHUNK_HEADER 4026 #define ERROR_AVC_NALU_UEV 4027 #define ERROR_AAC_BYTES_INVALID 4028 +#define ERROR_HTTP_REQUEST_EOF 4029 /////////////////////////////////////////////////////// // user-define error.