1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

add forward framework

This commit is contained in:
winlin 2013-11-29 16:52:24 +08:00
parent 2fa7610f9c
commit f656087d74
7 changed files with 376 additions and 39 deletions

View file

@ -108,7 +108,7 @@ void SrsForwarder::on_unpublish()
srs_freep(client); srs_freep(client);
} }
int SrsForwarder::on_meta_data(SrsOnMetaDataPacket* metadata) int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
return ret; return ret;
@ -222,8 +222,13 @@ int SrsForwarder::forward_cycle_imp()
srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret); srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
return ret; return ret;
} }
if ((ret = client->play_stream(stream_name, stream_id)) != ERROR_SUCCESS) { if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream_name=%s. ret=%d", stream_name.c_str(), ret); srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
return ret;
}
if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
stream_name.c_str(), stream_id, ret);
return ret; return ret;
} }

View file

@ -61,7 +61,7 @@ public:
public: public:
virtual int on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server); virtual int on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server);
virtual void on_unpublish(); virtual void on_unpublish();
virtual int on_meta_data(SrsOnMetaDataPacket* metadata); virtual int on_meta_data(SrsSharedPtrMessage* metadata);
virtual int on_audio(SrsSharedPtrMessage* msg); virtual int on_audio(SrsSharedPtrMessage* msg);
virtual int on_video(SrsSharedPtrMessage* msg); virtual int on_video(SrsSharedPtrMessage* msg);
private: private:

View file

@ -641,6 +641,15 @@ int SrsProtocol::on_send_message(ISrsMessage* msg)
pkt = dynamic_cast<SrsConnectAppPacket*>(common_msg->get_packet()); pkt = dynamic_cast<SrsConnectAppPacket*>(common_msg->get_packet());
if (pkt) { if (pkt) {
requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CONNECT; requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CONNECT;
break;
}
}
if (true) {
SrsCreateStreamPacket* pkt = NULL;
pkt = dynamic_cast<SrsCreateStreamPacket*>(common_msg->get_packet());
if (pkt) {
requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CREATE_STREAM;
break;
} }
} }
break; break;
@ -1246,6 +1255,10 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
srs_info("decode the AMF0/AMF3 response command(connect vhost/app message)."); srs_info("decode the AMF0/AMF3 response command(connect vhost/app message).");
packet = new SrsConnectAppResPacket(); packet = new SrsConnectAppResPacket();
return packet->decode(stream); return packet->decode(stream);
} else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) {
srs_info("decode the AMF0/AMF3 response command(createStream message).");
packet = new SrsCreateStreamResPacket(0, 0);
return packet->decode(stream);
} else { } else {
ret = ERROR_RTMP_NO_REQUEST; ret = ERROR_RTMP_NO_REQUEST;
srs_error("decode AMF0/AMF3 request failed. " srs_error("decode AMF0/AMF3 request failed. "
@ -1841,6 +1854,49 @@ int SrsCreateStreamPacket::decode(SrsStream* stream)
return ret; return ret;
} }
int SrsCreateStreamPacket::get_perfer_cid()
{
return RTMP_CID_OverConnection;
}
int SrsCreateStreamPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsCreateStreamPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_null_size();
}
int SrsCreateStreamPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
srs_info("encode create stream request packet success.");
return ret;
}
SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, double _stream_id) SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, double _stream_id)
{ {
command_name = RTMP_AMF0_COMMAND_RESULT; command_name = RTMP_AMF0_COMMAND_RESULT;
@ -1854,6 +1910,41 @@ SrsCreateStreamResPacket::~SrsCreateStreamResPacket()
srs_freep(command_object); srs_freep(command_object);
} }
int SrsCreateStreamResPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode createStream command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_RESULT) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode createStream command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode createStream transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode createStream command_object failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, stream_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode createStream stream_id failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode createStream response packet success");
return ret;
}
int SrsCreateStreamResPacket::get_perfer_cid() int SrsCreateStreamResPacket::get_perfer_cid()
{ {
return RTMP_CID_OverConnection; return RTMP_CID_OverConnection;
@ -2072,6 +2163,62 @@ int SrsPublishPacket::decode(SrsStream* stream)
return ret; return ret;
} }
int SrsPublishPacket::get_perfer_cid()
{
return RTMP_CID_OverStream;
}
int SrsPublishPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsPublishPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_null_size() + srs_amf0_get_string_size(stream_name)
+ srs_amf0_get_string_size(type);
}
int SrsPublishPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
if ((ret = srs_amf0_write_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("encode stream_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode stream_name success.");
if ((ret = srs_amf0_write_string(stream, type)) != ERROR_SUCCESS) {
srs_error("encode type failed. ret=%d", ret);
return ret;
}
srs_verbose("encode type success.");
srs_info("encode play request packet success.");
return ret;
}
SrsPausePacket::SrsPausePacket() SrsPausePacket::SrsPausePacket()
{ {
command_name = RTMP_AMF0_COMMAND_PAUSE; command_name = RTMP_AMF0_COMMAND_PAUSE;
@ -2191,6 +2338,75 @@ int SrsPlayPacket::decode(SrsStream* stream)
return ret; return ret;
} }
int SrsPlayPacket::get_perfer_cid()
{
return RTMP_CID_OverStream;
}
int SrsPlayPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsPlayPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_null_size() + srs_amf0_get_string_size(stream_name)
+ srs_amf0_get_number_size() + srs_amf0_get_number_size()
+ srs_amf0_get_boolean_size();
}
int SrsPlayPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
if ((ret = srs_amf0_write_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("encode stream_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode stream_name success.");
if ((ret = srs_amf0_write_number(stream, start)) != ERROR_SUCCESS) {
srs_error("encode start failed. ret=%d", ret);
return ret;
}
srs_verbose("encode start success.");
if ((ret = srs_amf0_write_number(stream, duration)) != ERROR_SUCCESS) {
srs_error("encode duration failed. ret=%d", ret);
return ret;
}
srs_verbose("encode duration success.");
if ((ret = srs_amf0_write_boolean(stream, reset)) != ERROR_SUCCESS) {
srs_error("encode reset failed. ret=%d", ret);
return ret;
}
srs_verbose("encode reset success.");
srs_info("encode play request packet success.");
return ret;
}
SrsPlayResPacket::SrsPlayResPacket() SrsPlayResPacket::SrsPlayResPacket()
{ {
command_name = RTMP_AMF0_COMMAND_RESULT; command_name = RTMP_AMF0_COMMAND_RESULT;

View file

@ -552,6 +552,13 @@ public:
virtual ~SrsCreateStreamPacket(); virtual ~SrsCreateStreamPacket();
public: public:
virtual int decode(SrsStream* stream); virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
}; };
/** /**
* response for SrsCreateStreamPacket. * response for SrsCreateStreamPacket.
@ -573,6 +580,8 @@ public:
public: public:
SrsCreateStreamResPacket(double _transaction_id, double _stream_id); SrsCreateStreamResPacket(double _transaction_id, double _stream_id);
virtual ~SrsCreateStreamResPacket(); virtual ~SrsCreateStreamResPacket();
public:
virtual int decode(SrsStream* stream);
public: public:
virtual int get_perfer_cid(); virtual int get_perfer_cid();
public: public:
@ -662,6 +671,13 @@ public:
virtual ~SrsPublishPacket(); virtual ~SrsPublishPacket();
public: public:
virtual int decode(SrsStream* stream); virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
}; };
/** /**
@ -717,6 +733,13 @@ public:
virtual ~SrsPlayPacket(); virtual ~SrsPlayPacket();
public: public:
virtual int decode(SrsStream* stream); virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
}; };
/** /**
* response for SrsPlayPacket. * response for SrsPlayPacket.
@ -1050,6 +1073,7 @@ protected:
return CLASS_NAME_STRING(SrsUserControlPacket); return CLASS_NAME_STRING(SrsUserControlPacket);
} }
public: public:
// @see: SrcPCUCEventType
int16_t event_type; int16_t event_type;
int32_t event_data; int32_t event_data;
/** /**

View file

@ -252,12 +252,99 @@ int SrsRtmpClient::connect_app(std::string app, std::string tc_url)
return ret; return ret;
} }
int SrsRtmpClient::play_stream(std::string stream, int& stream_id) int SrsRtmpClient::create_stream(int& stream_id)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// CreateStream // CreateStream
if (true) { if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket();
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
return ret;
}
}
// CreateStream _result.
if (true) {
SrsCommonMessage* msg = NULL;
SrsCreateStreamResPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect create stream response message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get create stream response message");
stream_id = (int)pkt->stream_id;
}
return ret;
}
int SrsRtmpClient::play(std::string stream, int stream_id)
{
int ret = ERROR_SUCCESS;
// Play(stream)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsPlayPacket* pkt = new SrsPlayPacket();
pkt->stream_name = stream;
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send play stream failed. "
"stream=%s, stream_id=%d, ret=%d",
stream.c_str(), stream_id, ret);
return ret;
}
}
// SetBufferLength(1000ms)
int buffer_length_ms = 1000;
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsUserControlPacket* pkt = new SrsUserControlPacket();
pkt->event_type = SrcPCUCSetBufferLength;
pkt->event_data = stream_id;
pkt->extra_data = buffer_length_ms;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send set buffer length failed. "
"stream=%s, stream_id=%d, bufferLength=%d, ret=%d",
stream.c_str(), stream_id, buffer_length_ms, ret);
return ret;
}
}
return ret;
}
int SrsRtmpClient::publish(std::string stream, int stream_id)
{
int ret = ERROR_SUCCESS;
// publish(stream)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsPublishPacket* pkt = new SrsPublishPacket();
pkt->stream_name = stream;
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send publish message failed. "
"stream=%s, stream_id=%d, ret=%d",
stream.c_str(), stream_id, ret);
return ret;
}
} }
return ret; return ret;

View file

@ -111,7 +111,9 @@ public:
public: public:
virtual int handshake(); virtual int handshake();
virtual int connect_app(std::string app, std::string tc_url); virtual int connect_app(std::string app, std::string tc_url);
virtual int play_stream(std::string stream, int& stream_id); virtual int create_stream(int& stream_id);
virtual int play(std::string stream, int stream_id);
virtual int publish(std::string stream, int stream_id);
}; };
/** /**

View file

@ -409,17 +409,6 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
return ret; return ret;
} }
#endif #endif
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_meta_data(metadata)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
return ret;
}
}
}
metadata->metadata->set("server", new SrsAmf0String( metadata->metadata->set("server", new SrsAmf0String(
RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
@ -477,6 +466,18 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
srs_trace("dispatch metadata success."); srs_trace("dispatch metadata success.");
} }
// copy to all forwarders
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
return ret;
}
}
}
return ret; return ret;
} }
@ -498,17 +499,6 @@ int SrsSource::on_audio(SrsCommonMessage* audio)
return ret; return ret;
} }
#endif #endif
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) {
srs_error("forwarder process audio message failed. ret=%d", ret);
return ret;
}
}
}
// copy to all consumer // copy to all consumer
if (true) { if (true) {
@ -523,6 +513,18 @@ int SrsSource::on_audio(SrsCommonMessage* audio)
srs_info("dispatch audio success."); srs_info("dispatch audio success.");
} }
// copy to all forwarders.
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) {
srs_error("forwarder process audio message failed. ret=%d", ret);
return ret;
}
}
}
// cache the sequence header if h264 // cache the sequence header if h264
if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) { if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) {
srs_freep(cache_sh_audio); srs_freep(cache_sh_audio);
@ -559,17 +561,6 @@ int SrsSource::on_video(SrsCommonMessage* video)
return ret; return ret;
} }
#endif #endif
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) {
srs_error("forwarder process video message failed. ret=%d", ret);
return ret;
}
}
}
// copy to all consumer // copy to all consumer
if (true) { if (true) {
@ -584,6 +575,18 @@ int SrsSource::on_video(SrsCommonMessage* video)
srs_info("dispatch video success."); srs_info("dispatch video success.");
} }
// copy to all forwarders.
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) {
srs_error("forwarder process video message failed. ret=%d", ret);
return ret;
}
}
}
// cache the sequence header if h264 // cache the sequence header if h264
if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) { if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) {
srs_freep(cache_sh_video); srs_freep(cache_sh_video);