1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

support FMLE publish streaming

This commit is contained in:
winlin 2013-10-22 16:44:10 +08:00
parent 33c586181d
commit ed371d6e7e
5 changed files with 234 additions and 80 deletions

View file

@ -28,15 +28,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_error.hpp> #include <srs_core_error.hpp>
#include <srs_core_log.hpp> #include <srs_core_log.hpp>
#include <srs_core_rtmp.hpp> #include <srs_core_rtmp.hpp>
#include <srs_core_protocol.hpp>
// default stream id for response the createStream request. #include <srs_core_auto_free.hpp>
#define SRS_DEFAULT_SID 1
SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
: SrsConnection(srs_server, client_stfd) : SrsConnection(srs_server, client_stfd)
{ {
ip = NULL; ip = NULL;
req = new SrsRequest(); req = new SrsRequest();
res = new SrsResponse();
rtmp = new SrsRtmp(client_stfd); rtmp = new SrsRtmp(client_stfd);
} }
@ -44,6 +44,7 @@ SrsClient::~SrsClient()
{ {
srs_freepa(ip); srs_freepa(ip);
srs_freep(req); srs_freep(req);
srs_freep(res);
srs_freep(rtmp); srs_freep(rtmp);
} }
@ -97,14 +98,13 @@ int SrsClient::do_cycle()
} }
srs_verbose("on_bw_done success"); srs_verbose("on_bw_done success");
int stream_id = SRS_DEFAULT_SID;
SrsClientType type; SrsClientType type;
if ((ret = rtmp->identify_client(stream_id, type, req->stream)) != ERROR_SUCCESS) { if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) {
srs_error("identify client failed. ret=%d", ret); srs_error("identify client failed. ret=%d", ret);
return ret; return ret;
} }
srs_verbose("identify client success. type=%d, stream_name=%s", type, req->stream.c_str()); srs_verbose("identify client success. type=%d, stream_name=%s", type, req->stream.c_str());
// TODO: read from config. // TODO: read from config.
int chunk_size = 4096; int chunk_size = 4096;
if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) { if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {
@ -117,13 +117,23 @@ int SrsClient::do_cycle()
case SrsClientPlay: { case SrsClientPlay: {
srs_verbose("start to play stream %s.", req->stream.c_str()); srs_verbose("start to play stream %s.", req->stream.c_str());
if ((ret = rtmp->start_play(stream_id)) != ERROR_SUCCESS) { if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to play stream failed. ret=%d", ret); srs_error("start to play stream failed. ret=%d", ret);
return ret; return ret;
} }
srs_info("start to play stream %s success", req->stream.c_str()); srs_info("start to play stream %s success", req->stream.c_str());
return streaming_play(); return streaming_play();
} }
case SrsClientPublish: {
srs_verbose("start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_publish(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to publish stream failed. ret=%d", ret);
return ret;
}
srs_info("start to publish stream %s success", req->stream.c_str());
return streaming_publish();
}
default: { default: {
ret = ERROR_SYSTEM_CLIENT_INVALID; ret = ERROR_SYSTEM_CLIENT_INVALID;
srs_info("invalid client type=%d. ret=%d", type, ret); srs_info("invalid client type=%d. ret=%d", type, ret);
@ -140,6 +150,39 @@ int SrsClient::streaming_play()
return ret; return ret;
} }
int SrsClient::streaming_publish()
{
int ret = ERROR_SUCCESS;
while (true) {
SrsMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv identify client message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsMessage, msg, false);
// process UnPublish event.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
srs_error("decode unpublish message failed. ret=%d", ret);
return ret;
}
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
return rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id);
}
srs_trace("ignore AMF0/AMF3 command message.");
}
}
return ret;
}
int SrsClient::get_peer_ip() int SrsClient::get_peer_ip()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;

View file

@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsRtmp; class SrsRtmp;
class SrsRequest; class SrsRequest;
class SrsResponse;
/** /**
* the client provides the main logic control for RTMP clients. * the client provides the main logic control for RTMP clients.
@ -43,6 +44,7 @@ class SrsClient : public SrsConnection
private: private:
char* ip; char* ip;
SrsRequest* req; SrsRequest* req;
SrsResponse* res;
SrsRtmp* rtmp; SrsRtmp* rtmp;
public: public:
SrsClient(SrsServer* srs_server, st_netfd_t client_stfd); SrsClient(SrsServer* srs_server, st_netfd_t client_stfd);
@ -51,6 +53,7 @@ protected:
virtual int do_cycle(); virtual int do_cycle();
private: private:
virtual int streaming_play(); virtual int streaming_play();
virtual int streaming_publish();
virtual int get_peer_ip(); virtual int get_peer_ip();
}; };

View file

@ -199,6 +199,7 @@ messages.
#define RTMP_AMF0_COMMAND_RESULT "_result" #define RTMP_AMF0_COMMAND_RESULT "_result"
#define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream" #define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream"
#define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish" #define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish"
#define RTMP_AMF0_COMMAND_UNPUBLISH "FCUnpublish"
#define RTMP_AMF0_COMMAND_PUBLISH "publish" #define RTMP_AMF0_COMMAND_PUBLISH "publish"
#define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess" #define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess"
@ -979,6 +980,10 @@ int SrsMessage::decode_packet()
srs_info("decode the AMF0/AMF3 command(publish message)."); srs_info("decode the AMF0/AMF3 command(publish message).");
packet = new SrsPublishPacket(); packet = new SrsPublishPacket();
return packet->decode(stream); return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) {
srs_info("decode the AMF0/AMF3 command(unpublish message).");
packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} }
// default packet to drop message. // default packet to drop message.
@ -1383,7 +1388,8 @@ int SrsFMLEStartPacket::decode(SrsStream* stream)
} }
if (command_name.empty() if (command_name.empty()
|| (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM
&& command_name != RTMP_AMF0_COMMAND_FC_PUBLISH) && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
&& command_name != RTMP_AMF0_COMMAND_UNPUBLISH)
) { ) {
ret = ERROR_RTMP_AMF0_DECODE; ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode FMLE start command_name failed. " srs_error("amf0 decode FMLE start command_name failed. "

View file

@ -53,9 +53,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define StatusCodeStreamStart "NetStream.Play.Start" #define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodePublishStart "NetStream.Publish.Start" #define StatusCodePublishStart "NetStream.Publish.Start"
#define StatusCodeDataStart "NetStream.Data.Start" #define StatusCodeDataStart "NetStream.Data.Start"
#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"
// FMLE // FMLE
#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish" #define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish"
#define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish"
// default stream id for response the createStream request.
#define SRS_DEFAULT_SID 1
SrsRequest::SrsRequest() SrsRequest::SrsRequest()
{ {
@ -107,6 +112,15 @@ int SrsRequest::discovery_app()
return ret; return ret;
} }
SrsResponse::SrsResponse()
{
stream_id = SRS_DEFAULT_SID;
}
SrsResponse::~SrsResponse()
{
}
SrsRtmp::SrsRtmp(st_netfd_t client_stfd) SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
{ {
protocol = new SrsProtocol(client_stfd); protocol = new SrsProtocol(client_stfd);
@ -164,6 +178,11 @@ int SrsRtmp::handshake()
return ret; return ret;
} }
int SrsRtmp::recv_message(SrsMessage** pmsg)
{
return protocol->recv_message(pmsg);
}
int SrsRtmp::connect_app(SrsRequest* req) int SrsRtmp::connect_app(SrsRequest* req)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -329,7 +348,7 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) { if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
srs_info("identify client by releaseStream, fmle publish."); srs_info("identify client by releaseStream, fmle publish.");
return identify_fmle_publish_client( return identify_fmle_publish_client(
dynamic_cast<SrsFMLEStartPacket*>(pkt), stream_id, type, stream_name); dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name);
} }
srs_trace("ignore AMF0/AMF3 command message."); srs_trace("ignore AMF0/AMF3 command message.");
@ -452,79 +471,10 @@ int SrsRtmp::start_play(int stream_id)
return ret; return ret;
} }
int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) int SrsRtmp::start_publish(int stream_id)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if (true) {
SrsMessage* msg = new SrsMessage();
SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send createStream response message failed. ret=%d", ret);
return ret;
}
srs_info("send createStream response message success.");
}
while (true) {
SrsMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv identify client message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsMessage, msg, false);
if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
srs_trace("identify ignore messages except "
"AMF0/AMF3 command message. type=%#x", msg->header.message_type);
continue;
}
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
srs_error("identify decode message failed. ret=%d", ret);
return ret;
}
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsPlayPacket*>(pkt)) {
SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);
type = SrsClientPlay;
stream_name = play->stream_name;
srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());
return ret;
}
srs_trace("ignore AMF0/AMF3 command message.");
}
return ret;
}
int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id, SrsClientType& type, std::string& stream_name)
{
int ret = ERROR_SUCCESS;
type = SrsClientPublish;
stream_name = req->stream_name;
// createStream response
if (true) {
SrsMessage* msg = new SrsMessage();
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send releaseStream response message failed. ret=%d", ret);
return ret;
}
srs_info("send releaseStream response message success.");
}
// FCPublish // FCPublish
double fc_publish_tid = 0; double fc_publish_tid = 0;
if (true) { if (true) {
@ -629,6 +579,84 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id
srs_info("send onStatus(NetStream.Publish.Start) message success."); srs_info("send onStatus(NetStream.Publish.Start) message success.");
} }
return ret;
}
int SrsRtmp::fmle_unpublish(int stream_id, double unpublish_tid)
{
int ret = ERROR_SUCCESS;
// publish response onFCUnpublish(NetStream.unpublish.Success)
if (true) {
SrsMessage* msg = new SrsMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH;
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));
pkt->data->set(StatusDescription, new SrsAmf0String("Stop publishing stream."));
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onFCUnpublish(NetStream.unpublish.Success) message failed. ret=%d", ret);
return ret;
}
srs_info("send onFCUnpublish(NetStream.unpublish.Success) message success.");
}
// FCUnpublish response
if (true) {
SrsMessage* msg = new SrsMessage();
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(unpublish_tid);
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send FCUnpublish response message failed. ret=%d", ret);
return ret;
}
srs_info("send FCUnpublish response message success.");
}
// publish response onStatus(NetStream.Unpublish.Success)
if (true) {
SrsMessage* msg = new SrsMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));
pkt->data->set(StatusDescription, new SrsAmf0String("Stream is now unpublished"));
pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Unpublish.Success) message failed. ret=%d", ret);
return ret;
}
srs_info("send onStatus(NetStream.Unpublish.Success) message success.");
}
srs_info("FMLE unpublish success.");
return ret;
}
int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name)
{
int ret = ERROR_SUCCESS;
if (true) {
SrsMessage* msg = new SrsMessage();
SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send createStream response message failed. ret=%d", ret);
return ret;
}
srs_info("send createStream response message success.");
}
while (true) { while (true) {
SrsMessage* msg = NULL; SrsMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
@ -637,6 +665,52 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id
} }
SrsAutoFree(SrsMessage, msg, false); SrsAutoFree(SrsMessage, msg, false);
if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
srs_trace("identify ignore messages except "
"AMF0/AMF3 command message. type=%#x", msg->header.message_type);
continue;
}
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
srs_error("identify decode message failed. ret=%d", ret);
return ret;
}
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsPlayPacket*>(pkt)) {
SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);
type = SrsClientPlay;
stream_name = play->stream_name;
srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());
return ret;
}
srs_trace("ignore AMF0/AMF3 command message.");
}
return ret;
}
int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name)
{
int ret = ERROR_SUCCESS;
type = SrsClientPublish;
stream_name = req->stream_name;
// releaseStream response
if (true) {
SrsMessage* msg = new SrsMessage();
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send releaseStream response message failed. ret=%d", ret);
return ret;
}
srs_info("send releaseStream response message success.");
} }
return ret; return ret;

View file

@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <st.h> #include <st.h>
class SrsProtocol; class SrsProtocol;
class SrsMessage;
class SrsCreateStreamPacket; class SrsCreateStreamPacket;
class SrsFMLEStartPacket; class SrsFMLEStartPacket;
@ -63,6 +64,17 @@ struct SrsRequest
virtual int discovery_app(); virtual int discovery_app();
}; };
/**
* the response to client.
*/
struct SrsResponse
{
int stream_id;
SrsResponse();
virtual ~SrsResponse();
};
/** /**
* the rtmp client type. * the rtmp client type.
*/ */
@ -88,6 +100,7 @@ public:
virtual ~SrsRtmp(); virtual ~SrsRtmp();
public: public:
virtual int handshake(); virtual int handshake();
virtual int recv_message(SrsMessage** pmsg);
virtual int connect_app(SrsRequest* req); virtual int connect_app(SrsRequest* req);
virtual int set_window_ack_size(int ack_size); virtual int set_window_ack_size(int ack_size);
/** /**
@ -116,9 +129,24 @@ public:
* onStatus(NetStream.Data.Start). * onStatus(NetStream.Data.Start).
*/ */
virtual int start_play(int stream_id); virtual int start_play(int stream_id);
/**
* when client type is publish, response with packets:
* releaseStream response
* FCPublish
* FCPublish response
* createStream response
* onFCPublish(NetStream.Publish.Start)
* onStatus(NetStream.Publish.Start)
*/
virtual int start_publish(int stream_id);
/**
* process the FMLE unpublish event.
* @unpublish_tid the unpublish request transaction id.
*/
virtual int fmle_unpublish(int stream_id, double unpublish_tid);
private: private:
virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name);
}; };
#endif #endif