1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

refine RTMP protocol completed, to 0.9.81

This commit is contained in:
winlin 2014-04-29 14:44:07 +08:00
parent be4c182d11
commit 80ddddf823
20 changed files with 343 additions and 353 deletions

View file

@ -189,21 +189,21 @@ int SrsBandwidth::do_bandwidth_check()
pkt->data->set("publish_bytes", SrsAmf0Any::number(publish_bytes));
pkt->data->set("publish_time", SrsAmf0Any::number(publish_actual_duration_ms));
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check finish message failed. ret=%d", ret);
return ret;
}
// if flash, we notice the result, and expect a final packet.
while (true) {
__SrsMessage* msg = NULL;
SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
// info level to ignore and return success.
srs_info("expect final message failed. ret=%d", ret);
return ERROR_SUCCESS;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get final message success.");
@ -233,7 +233,7 @@ int SrsBandwidth::check_play(
pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms));
pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms));
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start play message failed. ret=%d", ret);
return ret;
}
@ -242,13 +242,13 @@ int SrsBandwidth::check_play(
while (true) {
// recv client's starting play response
__SrsMessage* msg = NULL;
SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandwidth message succes.");
@ -281,10 +281,10 @@ int SrsBandwidth::check_play(
}
data_count += 2;
// TODO: FIXME: get length from the rtmp protocol stack.
play_bytes += pkt->get_payload_length();
// get length from the rtmp protocol stack.
play_bytes = rtmp->get_send_bytes();
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check play messages failed. ret=%d", ret);
return ret;
}
@ -314,7 +314,7 @@ int SrsBandwidth::check_play(
pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms));
pkt->data->set("bytes_delta", SrsAmf0Any::number(play_bytes));
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop play message failed. ret=%d", ret);
return ret;
}
@ -323,13 +323,13 @@ int SrsBandwidth::check_play(
while (true) {
// recv client's stop play response.
__SrsMessage* msg = NULL;
SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandwidth message succes.");
@ -357,7 +357,7 @@ int SrsBandwidth::check_publish(
pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms));
pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms));
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start publish message failed. ret=%d", ret);
return ret;
}
@ -366,13 +366,13 @@ int SrsBandwidth::check_publish(
while (true) {
// read client's notification of starting publish
__SrsMessage* msg = NULL;
SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandwidth message succes.");
@ -387,15 +387,14 @@ int SrsBandwidth::check_publish(
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(0);
__SrsMessage* msg = NULL;
if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) {
SrsMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
// TODO: FIXME.
publish_bytes += msg->header.payload_length;
publish_bytes = rtmp->get_recv_bytes();
int kbps = 0;
while (true) {
@ -420,7 +419,7 @@ int SrsBandwidth::check_publish(
pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms));
pkt->data->set("bytes_delta", SrsAmf0Any::number(publish_bytes));
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop publish message failed. ret=%d", ret);
return ret;
}
@ -435,13 +434,13 @@ int SrsBandwidth::check_publish(
// TODO: FIXME: check whether flash client.
while (false) {
// recv client's stop publish response.
__SrsMessage* msg = NULL;
SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandwidth message succes.");

View file

@ -455,7 +455,7 @@ int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata)
return ret;
}
int SrsDvrPlan::on_audio(__SrsSharedPtrMessage* audio)
int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio)
{
int ret = ERROR_SUCCESS;
@ -481,7 +481,7 @@ int SrsDvrPlan::on_audio(__SrsSharedPtrMessage* audio)
return ret;
}
int SrsDvrPlan::on_video(__SrsSharedPtrMessage* video)
int SrsDvrPlan::on_video(SrsSharedPtrMessage* video)
{
int ret = ERROR_SUCCESS;
@ -573,7 +573,7 @@ int SrsDvrPlan::flv_close()
return ret;
}
int SrsDvrPlan::update_duration(__SrsSharedPtrMessage* msg)
int SrsDvrPlan::update_duration(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -720,7 +720,7 @@ void SrsDvrSegmentPlan::on_unpublish()
dvr_enabled = false;
}
int SrsDvrSegmentPlan::update_duration(__SrsSharedPtrMessage* msg)
int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -903,7 +903,7 @@ int SrsDvrHssPlan::on_dvr_reap_flv_header(string path)
return ret;
}
int SrsDvrHssPlan::update_duration(__SrsSharedPtrMessage* msg)
int SrsDvrHssPlan::update_duration(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -998,11 +998,11 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata)
return ret;
}
int SrsDvr::on_audio(__SrsSharedPtrMessage* audio)
int SrsDvr::on_audio(SrsSharedPtrMessage* audio)
{
int ret = ERROR_SUCCESS;
SrsAutoFree(__SrsSharedPtrMessage, audio, false);
SrsAutoFree(SrsSharedPtrMessage, audio, false);
if ((ret = plan->on_audio(audio)) != ERROR_SUCCESS) {
return ret;
@ -1011,11 +1011,11 @@ int SrsDvr::on_audio(__SrsSharedPtrMessage* audio)
return ret;
}
int SrsDvr::on_video(__SrsSharedPtrMessage* video)
int SrsDvr::on_video(SrsSharedPtrMessage* video)
{
int ret = ERROR_SUCCESS;
SrsAutoFree(__SrsSharedPtrMessage, video, false);
SrsAutoFree(SrsSharedPtrMessage, video, false);
if ((ret = plan->on_video(video)) != ERROR_SUCCESS) {
return ret;

View file

@ -36,7 +36,7 @@ class SrsRequest;
class SrsStream;
class SrsRtmpJitter;
class SrsOnMetaDataPacket;
class __SrsSharedPtrMessage;
class SrsSharedPtrMessage;
/**
* file stream to read/write file.
@ -185,13 +185,13 @@ public:
virtual int on_publish();
virtual void on_unpublish() = 0;
virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
virtual int on_audio(__SrsSharedPtrMessage* audio);
virtual int on_video(__SrsSharedPtrMessage* video);
virtual int on_audio(SrsSharedPtrMessage* audio);
virtual int on_video(SrsSharedPtrMessage* video);
protected:
virtual int flv_open(std::string stream, std::string path);
virtual int flv_close();
virtual int open_new_segment();
virtual int update_duration(__SrsSharedPtrMessage* msg);
virtual int update_duration(SrsSharedPtrMessage* msg);
virtual int write_flv_header();
virtual int on_dvr_request_sh();
virtual int on_video_keyframe();
@ -233,7 +233,7 @@ public:
virtual int on_publish();
virtual void on_unpublish();
private:
virtual int update_duration(__SrsSharedPtrMessage* msg);
virtual int update_duration(SrsSharedPtrMessage* msg);
};
/**
@ -260,7 +260,7 @@ protected:
virtual int64_t filter_timestamp(int64_t timestamp);
private:
virtual int on_dvr_reap_flv_header(std::string path);
virtual int update_duration(__SrsSharedPtrMessage* msg);
virtual int update_duration(SrsSharedPtrMessage* msg);
};
/**
@ -300,11 +300,11 @@ public:
/**
* mux the audio packets to dvr.
*/
virtual int on_audio(__SrsSharedPtrMessage* audio);
virtual int on_audio(SrsSharedPtrMessage* audio);
/**
* mux the video packets to dvr.
*/
virtual int on_video(__SrsSharedPtrMessage* video);
virtual int on_video(SrsSharedPtrMessage* video);
};
#endif

View file

@ -175,15 +175,15 @@ int SrsEdgeIngester::ingest()
}
// read from client.
__SrsMessage* msg = NULL;
if ((ret = client->__recv_message(&msg)) != ERROR_SUCCESS) {
SrsMessage* msg = NULL;
if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("ingest recv origin server message failed. ret=%d", ret);
return ret;
}
srs_verbose("edge loop recv message. ret=%d", ret);
srs_assert(msg);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) {
return ret;
@ -193,7 +193,7 @@ int SrsEdgeIngester::ingest()
return ret;
}
int SrsEdgeIngester::process_publish_message(__SrsMessage* msg)
int SrsEdgeIngester::process_publish_message(SrsMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -218,7 +218,7 @@ int SrsEdgeIngester::process_publish_message(__SrsMessage* msg)
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((ret = client->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = client->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
}
@ -420,8 +420,8 @@ int SrsEdgeForwarder::cycle()
// read from client.
if (true) {
__SrsMessage* msg = NULL;
ret = client->__recv_message(&msg);
SrsMessage* msg = NULL;
ret = client->recv_message(&msg);
srs_verbose("edge loop recv message. ret=%d", ret);
if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
@ -435,7 +435,7 @@ int SrsEdgeForwarder::cycle()
// forward all messages.
int count = 0;
__SrsSharedPtrMessage** msgs = NULL;
SrsSharedPtrMessage** msgs = NULL;
if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward to origin failed. ret=%d", ret);
return ret;
@ -456,16 +456,16 @@ int SrsEdgeForwarder::cycle()
srs_verbose("no packets to forward.");
continue;
}
SrsAutoFree(__SrsSharedPtrMessage*, msgs, true);
SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
// all msgs to forward.
for (int i = 0; i < count; i++) {
__SrsSharedPtrMessage* msg = msgs[i];
SrsSharedPtrMessage* msg = msgs[i];
srs_assert(msg);
msgs[i] = NULL;
if ((ret = client->__send_and_free_message(msg)) != ERROR_SUCCESS) {
if ((ret = client->send_and_free_message(msg)) != ERROR_SUCCESS) {
srs_error("edge publish forwarder send message to server failed. ret=%d", ret);
return ret;
}
@ -475,7 +475,7 @@ int SrsEdgeForwarder::cycle()
return ret;
}
int SrsEdgeForwarder::proxy(__SrsMessage* msg)
int SrsEdgeForwarder::proxy(SrsMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -495,8 +495,8 @@ int SrsEdgeForwarder::proxy(__SrsMessage* msg)
}
// TODO: FIXME: use utility to copy msg to shared ptr msg.
__SrsSharedPtrMessage* copy = new __SrsSharedPtrMessage();
SrsAutoFree(__SrsSharedPtrMessage, copy, false);
SrsSharedPtrMessage* copy = new SrsSharedPtrMessage();
SrsAutoFree(SrsSharedPtrMessage, copy, false);
if ((ret = copy->initialize(msg)) != ERROR_SUCCESS) {
srs_error("initialize the msg failed. ret=%d", ret);
return ret;
@ -724,7 +724,7 @@ int SrsPublishEdge::on_client_publish()
return ret;
}
int SrsPublishEdge::on_proxy_publish(__SrsMessage* msg)
int SrsPublishEdge::on_proxy_publish(SrsMessage* msg)
{
return forwarder->proxy(msg);
}

View file

@ -40,7 +40,7 @@ class SrsRequest;
class SrsPlayEdge;
class SrsPublishEdge;
class SrsRtmpClient;
class __SrsMessage;
class SrsMessage;
class SrsMessageQueue;
class ISrsProtocolReaderWriter;
@ -99,7 +99,7 @@ private:
virtual int ingest();
virtual void close_underlayer_socket();
virtual int connect_server();
virtual int process_publish_message(__SrsMessage* msg);
virtual int process_publish_message(SrsMessage* msg);
};
/**
@ -142,7 +142,7 @@ public:
public:
virtual int cycle();
public:
virtual int proxy(__SrsMessage* msg);
virtual int proxy(SrsMessage* msg);
private:
virtual void close_underlayer_socket();
virtual int connect_server();
@ -202,7 +202,7 @@ public:
/**
* proxy publish stream to edge
*/
virtual int on_proxy_publish(__SrsMessage* msg);
virtual int on_proxy_publish(SrsMessage* msg);
/**
* proxy unpublish stream to edge.
*/

View file

@ -148,7 +148,7 @@ void SrsForwarder::on_unpublish()
srs_freep(io);
}
int SrsForwarder::on_meta_data(__SrsSharedPtrMessage* metadata)
int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
{
int ret = ERROR_SUCCESS;
@ -164,7 +164,7 @@ int SrsForwarder::on_meta_data(__SrsSharedPtrMessage* metadata)
return ret;
}
int SrsForwarder::on_audio(__SrsSharedPtrMessage* msg)
int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -180,7 +180,7 @@ int SrsForwarder::on_audio(__SrsSharedPtrMessage* msg)
return ret;
}
int SrsForwarder::on_video(__SrsSharedPtrMessage* msg)
int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -315,8 +315,8 @@ int SrsForwarder::forward()
// read from client.
if (true) {
__SrsMessage* msg = NULL;
ret = client->__recv_message(&msg);
SrsMessage* msg = NULL;
ret = client->recv_message(&msg);
srs_verbose("play loop recv message. ret=%d", ret);
if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
@ -329,7 +329,7 @@ int SrsForwarder::forward()
// forward all messages.
int count = 0;
__SrsSharedPtrMessage** msgs = NULL;
SrsSharedPtrMessage** msgs = NULL;
if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward failed. ret=%d", ret);
return ret;
@ -348,16 +348,16 @@ int SrsForwarder::forward()
srs_verbose("no packets to forward.");
continue;
}
SrsAutoFree(__SrsSharedPtrMessage*, msgs, true);
SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
// all msgs to forward.
for (int i = 0; i < count; i++) {
__SrsSharedPtrMessage* msg = msgs[i];
SrsSharedPtrMessage* msg = msgs[i];
srs_assert(msg);
msgs[i] = NULL;
if ((ret = client->__send_and_free_message(msg)) != ERROR_SUCCESS) {
if ((ret = client->send_and_free_message(msg)) != ERROR_SUCCESS) {
srs_error("forwarder send message to server failed. ret=%d", ret);
return ret;
}

View file

@ -35,7 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_thread.hpp>
class ISrsProtocolReaderWriter;
class __SrsSharedPtrMessage;
class SrsSharedPtrMessage;
class SrsOnMetaDataPacket;
class SrsMessageQueue;
class SrsRtmpJitter;
@ -72,9 +72,9 @@ public:
public:
virtual int on_publish(SrsRequest* req, std::string forward_server);
virtual void on_unpublish();
virtual int on_meta_data(__SrsSharedPtrMessage* metadata);
virtual int on_audio(__SrsSharedPtrMessage* msg);
virtual int on_video(__SrsSharedPtrMessage* msg);
virtual int on_meta_data(SrsSharedPtrMessage* metadata);
virtual int on_audio(SrsSharedPtrMessage* msg);
virtual int on_video(SrsSharedPtrMessage* msg);
// interface ISrsThreadHandler.
public:
virtual int cycle();

View file

@ -1379,11 +1379,11 @@ int SrsHls::on_meta_data(SrsAmf0Object* metadata)
return ret;
}
int SrsHls::on_audio(__SrsSharedPtrMessage* audio)
int SrsHls::on_audio(SrsSharedPtrMessage* audio)
{
int ret = ERROR_SUCCESS;
SrsAutoFree(__SrsSharedPtrMessage, audio, false);
SrsAutoFree(SrsSharedPtrMessage, audio, false);
if (!hls_enabled) {
return ret;
@ -1423,11 +1423,11 @@ int SrsHls::on_audio(__SrsSharedPtrMessage* audio)
return ret;
}
int SrsHls::on_video(__SrsSharedPtrMessage* video)
int SrsHls::on_video(SrsSharedPtrMessage* video)
{
int ret = ERROR_SUCCESS;
SrsAutoFree(__SrsSharedPtrMessage, video, false);
SrsAutoFree(SrsSharedPtrMessage, video, false);
if (!hls_enabled) {
return ret;

View file

@ -34,7 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <string>
#include <vector>
class __SrsSharedPtrMessage;
class SrsSharedPtrMessage;
class SrsCodecSample;
class SrsCodecBuffer;
class SrsMpegtsFrame;
@ -314,11 +314,11 @@ public:
/**
* mux the audio packets to ts.
*/
virtual int on_audio(__SrsSharedPtrMessage* audio);
virtual int on_audio(SrsSharedPtrMessage* audio);
/**
* mux the video packets to ts.
*/
virtual int on_video(__SrsSharedPtrMessage* video);
virtual int on_video(SrsSharedPtrMessage* video);
private:
virtual void hls_mux();
};

View file

@ -472,8 +472,8 @@ int SrsRtmpConn::playing(SrsSource* source)
// read from client.
int ctl_msg_ret = ERROR_SUCCESS;
if (true) {
__SrsMessage* msg = NULL;
ctl_msg_ret = ret = rtmp->__recv_message(&msg);
SrsMessage* msg = NULL;
ctl_msg_ret = ret = rtmp->recv_message(&msg);
srs_verbose("play loop recv message. ret=%d", ret);
if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
@ -491,7 +491,7 @@ int SrsRtmpConn::playing(SrsSource* source)
}
// get messages from consumer.
__SrsSharedPtrMessage** msgs = NULL;
SrsSharedPtrMessage** msgs = NULL;
int count = 0;
if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
@ -510,11 +510,11 @@ int SrsRtmpConn::playing(SrsSource* source)
srs_verbose("no packets in queue.");
continue;
}
SrsAutoFree(__SrsSharedPtrMessage*, msgs, true);
SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
// sendout messages
for (int i = 0; i < count; i++) {
__SrsSharedPtrMessage* msg = msgs[i];
SrsSharedPtrMessage* msg = msgs[i];
// the send_message will free the msg,
// so set the msgs[i] to NULL.
@ -530,7 +530,7 @@ int SrsRtmpConn::playing(SrsSource* source)
duration += msg->header.timestamp - starttime;
starttime = msg->header.timestamp;
if ((ret = rtmp->__send_and_free_message(msg)) != ERROR_SUCCESS) {
if ((ret = rtmp->send_and_free_message(msg)) != ERROR_SUCCESS) {
srs_error("send message to client failed. ret=%d", ret);
return ret;
}
@ -573,13 +573,13 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
// switch to other st-threads.
st_usleep(0);
__SrsMessage* msg = NULL;
if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) {
SrsMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("fmle recv identify client message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
pithy_print.elapse();
@ -594,7 +594,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
// process UnPublish event.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("fmle decode unpublish message failed. ret=%d", ret);
return ret;
}
@ -648,15 +648,15 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
// switch to other st-threads.
st_usleep(0);
__SrsMessage* msg = NULL;
if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) {
SrsMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("flash recv identify client message failed. ret=%d", ret);
}
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
pithy_print.elapse();
@ -671,7 +671,7 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
// process UnPublish event.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("flash decode unpublish message failed. ret=%d", ret);
return ret;
}
@ -694,7 +694,7 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
return ret;
}
int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, bool vhost_is_edge)
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
@ -727,7 +727,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, b
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
}
@ -750,7 +750,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, b
return ret;
}
int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, __SrsMessage* msg)
int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -758,7 +758,7 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, __SrsMessage* m
srs_verbose("ignore all empty message.");
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsMessage, msg, false);
if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
srs_info("ignore all message except amf0/amf3 command.");
@ -766,7 +766,7 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, __SrsMessage* m
}
SrsPacket* pkt = NULL;
if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode the amf0/amf3 command packet failed. ret=%d", ret);
return ret;
}

View file

@ -40,7 +40,7 @@ class SrsResponse;
class SrsSource;
class SrsRefer;
class SrsConsumer;
class __SrsMessage;
class SrsMessage;
class SrsSocket;
#ifdef SRS_AUTO_HTTP_CALLBACK
class SrsHttpHooks;
@ -80,8 +80,8 @@ private:
virtual int playing(SrsSource* source);
virtual int fmle_publish(SrsSource* source);
virtual int flash_publish(SrsSource* source);
virtual int process_publish_message(SrsSource* source, __SrsMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, __SrsMessage* msg);
virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
private:
virtual int http_hooks_on_connect();
virtual void http_hooks_on_close();

View file

@ -52,7 +52,7 @@ SrsRtmpJitter::~SrsRtmpJitter()
{
}
int SrsRtmpJitter::correct(__SrsSharedPtrMessage* msg, int tba, int tbv)
int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv)
{
int ret = ERROR_SUCCESS;
@ -130,7 +130,7 @@ void SrsMessageQueue::set_queue_size(double queue_size)
queue_size_ms = (int)(queue_size * 1000);
}
int SrsMessageQueue::enqueue(__SrsSharedPtrMessage* msg)
int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -151,7 +151,7 @@ int SrsMessageQueue::enqueue(__SrsSharedPtrMessage* msg)
return ret;
}
int SrsMessageQueue::get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs, int& count)
int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
{
int ret = ERROR_SUCCESS;
@ -169,13 +169,13 @@ int SrsMessageQueue::get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs,
return ret;
}
pmsgs = new __SrsSharedPtrMessage*[count];
pmsgs = new SrsSharedPtrMessage*[count];
for (int i = 0; i < count; i++) {
pmsgs[i] = msgs[i];
}
__SrsSharedPtrMessage* last = msgs[count - 1];
SrsSharedPtrMessage* last = msgs[count - 1];
av_start_time = last->header.timestamp;
if (count == (int)msgs.size()) {
@ -196,7 +196,7 @@ void SrsMessageQueue::shrink()
// for when we shrinked, the first is the iframe,
// we will directly remove the gop next time.
for (int i = 1; i < (int)msgs.size(); i++) {
__SrsSharedPtrMessage* msg = msgs[i];
SrsSharedPtrMessage* msg = msgs[i];
if (msg->header.is_video()) {
if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) {
@ -222,7 +222,7 @@ void SrsMessageQueue::shrink()
// remove the first gop from the front
for (int i = 0; i < iframe_index; i++) {
__SrsSharedPtrMessage* msg = msgs[i];
SrsSharedPtrMessage* msg = msgs[i];
srs_freep(msg);
}
msgs.erase(msgs.begin(), msgs.begin() + iframe_index);
@ -230,10 +230,10 @@ void SrsMessageQueue::shrink()
void SrsMessageQueue::clear()
{
std::vector<__SrsSharedPtrMessage*>::iterator it;
std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
__SrsSharedPtrMessage* msg = *it;
SrsSharedPtrMessage* msg = *it;
srs_freep(msg);
}
@ -267,7 +267,7 @@ int SrsConsumer::get_time()
return jitter->get_time();
}
int SrsConsumer::enqueue(__SrsSharedPtrMessage* msg, int tba, int tbv)
int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
{
int ret = ERROR_SUCCESS;
@ -285,7 +285,7 @@ int SrsConsumer::enqueue(__SrsSharedPtrMessage* msg, int tba, int tbv)
return ret;
}
int SrsConsumer::get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs, int& count)
int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
{
// paused, return nothing.
if (paused) {
@ -329,7 +329,7 @@ void SrsGopCache::set(bool enabled)
srs_info("enable gop cache");
}
int SrsGopCache::cache(__SrsSharedPtrMessage* msg)
int SrsGopCache::cache(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -368,9 +368,9 @@ int SrsGopCache::cache(__SrsSharedPtrMessage* msg)
void SrsGopCache::clear()
{
std::vector<__SrsSharedPtrMessage*>::iterator it;
std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
__SrsSharedPtrMessage* msg = *it;
SrsSharedPtrMessage* msg = *it;
srs_freep(msg);
}
gop_cache.clear();
@ -382,9 +382,9 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)
{
int ret = ERROR_SUCCESS;
std::vector<__SrsSharedPtrMessage*>::iterator it;
std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
__SrsSharedPtrMessage* msg = *it;
SrsSharedPtrMessage* msg = *it;
if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) {
srs_error("dispatch cached gop failed. ret=%d", ret);
return ret;
@ -406,7 +406,7 @@ int64_t SrsGopCache::get_start_time()
return 0;
}
__SrsSharedPtrMessage* msg = gop_cache[0];
SrsSharedPtrMessage* msg = gop_cache[0];
srs_assert(msg);
return msg->header.timestamp;
@ -789,7 +789,7 @@ bool SrsSource::can_publish()
return _can_publish;
}
int SrsSource::on_meta_data(__SrsMessage* msg, SrsOnMetaDataPacket* metadata)
int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
@ -823,13 +823,7 @@ int SrsSource::on_meta_data(__SrsMessage* msg, SrsOnMetaDataPacket* metadata)
}
// encode the metadata to payload
int size = metadata->get_payload_length();
if (size <= 0) {
srs_warn("ignore the invalid metadata. size=%d", size);
return ret;
}
srs_verbose("get metadata size success.");
int size = 0;
char* payload = NULL;
if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
srs_error("encode metadata error. ret=%d", ret);
@ -838,9 +832,14 @@ int SrsSource::on_meta_data(__SrsMessage* msg, SrsOnMetaDataPacket* metadata)
}
srs_verbose("encode metadata success.");
if (size <= 0) {
srs_warn("ignore the invalid metadata. size=%d", size);
return ret;
}
// create a shared ptr message.
srs_freep(cache_metadata);
cache_metadata = new __SrsSharedPtrMessage();
cache_metadata = new SrsSharedPtrMessage();
// dump message to shared ptr message.
if ((ret = cache_metadata->initialize(&msg->header, payload, size)) != ERROR_SUCCESS) {
@ -877,12 +876,12 @@ int SrsSource::on_meta_data(__SrsMessage* msg, SrsOnMetaDataPacket* metadata)
return ret;
}
int SrsSource::on_audio(__SrsMessage* audio)
int SrsSource::on_audio(SrsMessage* audio)
{
int ret = ERROR_SUCCESS;
__SrsSharedPtrMessage* msg = new __SrsSharedPtrMessage();
SrsAutoFree(__SrsSharedPtrMessage, msg, false);
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
SrsAutoFree(SrsSharedPtrMessage, msg, false);
if ((ret = msg->initialize(audio)) != ERROR_SUCCESS) {
srs_error("initialize the audio failed. ret=%d", ret);
return ret;
@ -966,12 +965,12 @@ int SrsSource::on_audio(__SrsMessage* audio)
return ret;
}
int SrsSource::on_video(__SrsMessage* video)
int SrsSource::on_video(SrsMessage* video)
{
int ret = ERROR_SUCCESS;
__SrsSharedPtrMessage* msg = new __SrsSharedPtrMessage();
SrsAutoFree(__SrsSharedPtrMessage, msg, false);
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
SrsAutoFree(SrsSharedPtrMessage, msg, false);
if ((ret = msg->initialize(video)) != ERROR_SUCCESS) {
srs_error("initialize the video failed. ret=%d", ret);
return ret;
@ -1207,7 +1206,7 @@ int SrsSource::on_edge_start_publish()
return publish_edge->on_client_publish();
}
int SrsSource::on_edge_proxy_publish(__SrsMessage* msg)
int SrsSource::on_edge_proxy_publish(SrsMessage* msg)
{
return publish_edge->on_proxy_publish(msg);
}

View file

@ -40,9 +40,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsPlayEdge;
class SrsPublishEdge;
class SrsSource;
class __SrsMessage;
class SrsMessage;
class SrsOnMetaDataPacket;
class __SrsSharedPtrMessage;
class SrsSharedPtrMessage;
class SrsForwarder;
class SrsRequest;
class SrsSocket;
@ -74,7 +74,7 @@ public:
/**
* detect the time jitter and correct it.
*/
virtual int correct(__SrsSharedPtrMessage* msg, int tba, int tbv);
virtual int correct(SrsSharedPtrMessage* msg, int tba, int tbv);
/**
* get current client time, the last packet time.
*/
@ -91,7 +91,7 @@ private:
int64_t av_start_time;
int64_t av_end_time;
int queue_size_ms;
std::vector<__SrsSharedPtrMessage*> msgs;
std::vector<SrsSharedPtrMessage*> msgs;
public:
SrsMessageQueue();
virtual ~SrsMessageQueue();
@ -106,14 +106,14 @@ public:
* enqueue the message, the timestamp always monotonically.
* @param msg, the msg to enqueue, user never free it whatever the return code.
*/
virtual int enqueue(__SrsSharedPtrMessage* msg);
virtual int enqueue(SrsSharedPtrMessage* msg);
/**
* get packets in consumer queue.
* @pmsgs SrsMessages*[], output the prt array.
* @count the count in array.
* @max_count the max count to dequeue, 0 to dequeue all.
*/
virtual int get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs, int& count);
virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
private:
/**
* remove a gop from the front.
@ -150,14 +150,14 @@ public:
* @param tbv timebase of video.
* used to calc the video time delta if time-jitter detected.
*/
virtual int enqueue(__SrsSharedPtrMessage* msg, int tba, int tbv);
virtual int enqueue(SrsSharedPtrMessage* msg, int tba, int tbv);
/**
* get packets in consumer queue.
* @pmsgs SrsMessages*[], output the prt array.
* @count the count in array.
* @max_count the max count to dequeue, 0 to dequeue all.
*/
virtual int get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs, int& count);
virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
/**
* when client send the pause message.
*/
@ -185,7 +185,7 @@ private:
/**
* cached gop.
*/
std::vector<__SrsSharedPtrMessage*> gop_cache;
std::vector<SrsSharedPtrMessage*> gop_cache;
public:
SrsGopCache();
virtual ~SrsGopCache();
@ -196,7 +196,7 @@ public:
* 1. cache the gop when got h264 video packet.
* 2. clear gop when got keyframe.
*/
virtual int cache(__SrsSharedPtrMessage* msg);
virtual int cache(SrsSharedPtrMessage* msg);
virtual void clear();
virtual int dump(SrsConsumer* consumer, int tba, int tbv);
/**
@ -267,11 +267,11 @@ private:
// TODO: FIXME: to support reload atc.
bool atc;
private:
__SrsSharedPtrMessage* cache_metadata;
SrsSharedPtrMessage* cache_metadata;
// the cached video sequence header.
__SrsSharedPtrMessage* cache_sh_video;
SrsSharedPtrMessage* cache_sh_video;
// the cached audio sequence header.
__SrsSharedPtrMessage* cache_sh_audio;
SrsSharedPtrMessage* cache_sh_audio;
public:
/**
* @param _req the client request object,
@ -299,9 +299,9 @@ public:
virtual int on_dvr_request_sh();
public:
virtual bool can_publish();
virtual int on_meta_data(__SrsMessage* msg, SrsOnMetaDataPacket* metadata);
virtual int on_audio(__SrsMessage* audio);
virtual int on_video(__SrsMessage* video);
virtual int on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsMessage* audio);
virtual int on_video(SrsMessage* video);
/**
* publish stream event notify.
* @param _req the request from client, the source will deep copy it,
@ -322,7 +322,7 @@ public:
// for edge, when publish edge stream, check the state
virtual int on_edge_start_publish();
// for edge, proxy the publish
virtual int on_edge_proxy_publish(__SrsMessage* msg);
virtual int on_edge_proxy_publish(SrsMessage* msg);
// for edge, proxy stop publish
virtual void on_edge_proxy_unpublish();
private: