1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

refine rtmp protocol stack, refer to go.rtmp

This commit is contained in:
winlin 2014-04-29 13:39:16 +08:00
parent 2e5337a26e
commit ebf6203c3b
17 changed files with 1583 additions and 261 deletions

View file

@ -472,8 +472,8 @@ int SrsRtmpConn::playing(SrsSource* source)
// read from client.
int ctl_msg_ret = ERROR_SUCCESS;
if (true) {
SrsCommonMessage* msg = NULL;
ctl_msg_ret = ret = rtmp->recv_message(&msg);
__SrsMessage* msg = NULL;
ctl_msg_ret = ret = rtmp->__recv_message(&msg);
srs_verbose("play loop recv message. ret=%d", ret);
if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
@ -491,7 +491,7 @@ int SrsRtmpConn::playing(SrsSource* source)
}
// get messages from consumer.
SrsSharedPtrMessage** msgs = NULL;
__SrsSharedPtrMessage** msgs = NULL;
int count = 0;
if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
@ -510,11 +510,11 @@ int SrsRtmpConn::playing(SrsSource* source)
srs_verbose("no packets in queue.");
continue;
}
SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
SrsAutoFree(__SrsSharedPtrMessage*, msgs, true);
// sendout messages
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];
__SrsSharedPtrMessage* msg = msgs[i];
// the send_message will free the msg,
// so set the msgs[i] to NULL.
@ -530,7 +530,7 @@ int SrsRtmpConn::playing(SrsSource* source)
duration += msg->header.timestamp - starttime;
starttime = msg->header.timestamp;
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = rtmp->__send_and_free_message(msg)) != ERROR_SUCCESS) {
srs_error("send message to client failed. ret=%d", ret);
return ret;
}
@ -573,14 +573,13 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
// switch to other st-threads.
st_usleep(0);
SrsCommonMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
__SrsMessage* msg = NULL;
if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("fmle recv identify client message failed. ret=%d", ret);
return ret;
}
srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
pithy_print.elapse();
@ -594,12 +593,14 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
// process UnPublish event.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("fmle decode unpublish message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt, false);
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) {
@ -647,15 +648,15 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
// switch to other st-threads.
st_usleep(0);
SrsCommonMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
__SrsMessage* msg = NULL;
if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("flash recv identify client message failed. ret=%d", ret);
}
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
pithy_print.elapse();
@ -669,11 +670,14 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
// process UnPublish event.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("flash decode unpublish message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt, false);
// flash unpublish.
// TODO: maybe need to support republish.
srs_trace("flash flash publish finished.");
@ -690,7 +694,7 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
return ret;
}
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge)
int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
@ -720,12 +724,13 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt, false);
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
@ -743,7 +748,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms
return ret;
}
int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, __SrsMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -751,29 +756,32 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag
srs_verbose("ignore all empty message.");
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
srs_info("ignore all message except amf0/amf3 command.");
return ret;
}
if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode the amf0/amf3 command packet failed. ret=%d", ret);
return ret;
}
srs_info("decode the amf0/amf3 command packet success.");
SrsAutoFree(SrsPacket, pkt, false);
// for jwplayer/flowplayer, which send close as pause message.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/6
SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(msg->get_packet());
SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt);
if (close) {
ret = ERROR_CONTROL_RTMP_CLOSE;
srs_trace("system control message: rtmp close stream. ret=%d", ret);
return ret;
}
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(msg->get_packet());
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);
if (!pause) {
srs_info("ignore all amf0/amf3 command except pause.");
return ret;