1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for bug #251, remove the SrsMessage, use SrsCommonMessage or SrsSharedPtrMessage. 2.0.60

This commit is contained in:
winlin 2014-12-05 23:03:52 +08:00
parent 528ae1e9b1
commit f9b9a60de7
21 changed files with 502 additions and 509 deletions

View file

@ -94,12 +94,12 @@ int _srs_expect_bandwidth_packet(SrsRtmpServer* rtmp, _CheckPacketType pfn)
int ret = ERROR_SUCCESS;
while (true) {
SrsMessage* msg = NULL;
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
return ret;
}
SrsAutoFree(SrsMessage, msg);
SrsAutoFree(SrsCommonMessage, msg);
SrsAutoFree(SrsBandwidthPacket, pkt);
srs_info("get bwtc message success.");
@ -380,12 +380,12 @@ int SrsBandwidth::publish_checking(SrsBandwidthSample* sample, SrsKbpsLimit* lim
srs_update_system_time_ms();
int64_t starttime = srs_get_system_time_ms();
while ((srs_get_system_time_ms() - starttime) < sample->duration_ms) {
SrsMessage* msg = NULL;
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = _rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
return ret;
}
SrsAutoFree(SrsMessage, msg);
SrsAutoFree(SrsCommonMessage, msg);
SrsAutoFree(SrsBandwidthPacket, pkt);
srs_info("get publish message success.");

View file

@ -183,7 +183,7 @@ int SrsEdgeIngester::ingest()
}
// read from client.
SrsMessage* msg = NULL;
SrsCommonMessage* msg = NULL;
if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("pull origin server message failed. ret=%d", ret);
@ -193,7 +193,7 @@ int SrsEdgeIngester::ingest()
srs_verbose("edge loop recv message. ret=%d", ret);
srs_assert(msg);
SrsAutoFree(SrsMessage, msg);
SrsAutoFree(SrsCommonMessage, msg);
if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) {
return ret;
@ -256,7 +256,7 @@ int SrsEdgeIngester::connect_app(string ep_server, string ep_port)
return ret;
}
int SrsEdgeIngester::process_publish_message(SrsMessage* msg)
int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -485,7 +485,7 @@ int SrsEdgeForwarder::cycle()
// read from client.
if (true) {
SrsMessage* msg = NULL;
SrsCommonMessage* msg = NULL;
ret = client->recv_message(&msg);
srs_verbose("edge loop recv message. ret=%d", ret);
@ -534,7 +534,7 @@ int SrsEdgeForwarder::cycle()
return ret;
}
int SrsEdgeForwarder::proxy(SrsMessage* msg)
int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -825,7 +825,7 @@ int SrsPublishEdge::on_client_publish()
return ret;
}
int SrsPublishEdge::on_proxy_publish(SrsMessage* msg)
int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg)
{
return forwarder->proxy(msg);
}

View file

@ -42,7 +42,7 @@ class SrsRequest;
class SrsPlayEdge;
class SrsPublishEdge;
class SrsRtmpClient;
class SrsMessage;
class SrsCommonMessage;
class SrsMessageQueue;
class ISrsProtocolReaderWriter;
class SrsKbps;
@ -104,7 +104,7 @@ private:
virtual void close_underlayer_socket();
virtual int connect_server(std::string& ep_server, std::string& ep_port);
virtual int connect_app(std::string ep_server, std::string ep_port);
virtual int process_publish_message(SrsMessage* msg);
virtual int process_publish_message(SrsCommonMessage* msg);
};
/**
@ -148,7 +148,7 @@ public:
public:
virtual int cycle();
public:
virtual int proxy(SrsMessage* msg);
virtual int proxy(SrsCommonMessage* msg);
private:
virtual void close_underlayer_socket();
virtual int connect_server(std::string& ep_server, std::string& ep_port);
@ -214,7 +214,7 @@ public:
/**
* proxy publish stream to edge
*/
virtual int on_proxy_publish(SrsMessage* msg);
virtual int on_proxy_publish(SrsCommonMessage* msg);
/**
* proxy unpublish stream to edge.
*/

View file

@ -404,7 +404,7 @@ int SrsForwarder::forward()
// read from client.
if (true) {
SrsMessage* msg = NULL;
SrsCommonMessage* msg = NULL;
ret = client->recv_message(&msg);
srs_verbose("play loop recv message. ret=%d", ret);

View file

@ -81,7 +81,7 @@ int SrsRecvThread::cycle()
continue;
}
SrsMessage* msg = NULL;
SrsCommonMessage* msg = NULL;
// recv and handle message
ret = rtmp->recv_message(&msg);
@ -145,9 +145,9 @@ SrsQueueRecvThread::~SrsQueueRecvThread()
trd.stop();
// clear all messages.
std::vector<SrsMessage*>::iterator it;
std::vector<SrsCommonMessage*>::iterator it;
for (it = queue.begin(); it != queue.end(); ++it) {
SrsMessage* msg = *it;
SrsCommonMessage* msg = *it;
srs_freep(msg);
}
queue.clear();
@ -173,11 +173,11 @@ int SrsQueueRecvThread::size()
return (int)queue.size();
}
SrsMessage* SrsQueueRecvThread::pump()
SrsCommonMessage* SrsQueueRecvThread::pump()
{
srs_assert(!queue.empty());
SrsMessage* msg = *queue.begin();
SrsCommonMessage* msg = *queue.begin();
queue.erase(queue.begin());
@ -198,7 +198,7 @@ bool SrsQueueRecvThread::can_handle()
return empty();
}
int SrsQueueRecvThread::handle(SrsMessage* msg)
int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
{
// put into queue, the send thread will get and process it,
// @see SrsRtmpConn::process_play_control_msg
@ -335,7 +335,7 @@ bool SrsPublishRecvThread::can_handle()
return true;
}
int SrsPublishRecvThread::handle(SrsMessage* msg)
int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;

View file

@ -38,7 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_reload.hpp>
class SrsRtmpServer;
class SrsMessage;
class SrsCommonMessage;
class SrsRtmpConn;
class SrsSource;
class SrsRequest;
@ -62,7 +62,7 @@ public:
/**
* process the received message.
*/
virtual int handle(SrsMessage* msg) = 0;
virtual int handle(SrsCommonMessage* msg) = 0;
/**
* when recv message error.
*/
@ -107,7 +107,7 @@ public:
class SrsQueueRecvThread : public ISrsMessageHandler
{
private:
std::vector<SrsMessage*> queue;
std::vector<SrsCommonMessage*> queue;
SrsRecvThread trd;
SrsRtmpServer* rtmp;
// the recv thread error code.
@ -121,11 +121,11 @@ public:
public:
virtual bool empty();
virtual int size();
virtual SrsMessage* pump();
virtual SrsCommonMessage* pump();
virtual int error_code();
public:
virtual bool can_handle();
virtual int handle(SrsMessage* msg);
virtual int handle(SrsCommonMessage* msg);
virtual void on_recv_error(int ret);
public:
virtual void on_thread_start();
@ -183,7 +183,7 @@ public:
// interface ISrsMessageHandler
public:
virtual bool can_handle();
virtual int handle(SrsMessage* msg);
virtual int handle(SrsCommonMessage* msg);
virtual void on_recv_error(int ret);
// interface IMergeReadHandler
public:

View file

@ -576,7 +576,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/196
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
while (!trd->empty()) {
SrsMessage* msg = trd->pump();
SrsCommonMessage* msg = trd->pump();
srs_verbose("pump client message to process.");
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
@ -628,7 +628,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
// we start to collect the durations for each message.
if (user_specified_duration_to_stop) {
for (int i = 0; i < count; i++) {
SrsMessage* msg = msgs.msgs[i];
SrsSharedPtrMessage* msg = msgs.msgs[i];
// foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it.
@ -806,7 +806,7 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
return ret;
}
int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsMessage* msg, bool is_fmle, bool vhost_is_edge)
int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
@ -850,7 +850,7 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsMessage* msg, bool
return ret;
}
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge)
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
@ -915,7 +915,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsMessage* msg, boo
return ret;
}
int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg)
int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -923,7 +923,7 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg
srs_verbose("ignore all empty message.");
return ret;
}
SrsAutoFree(SrsMessage, msg);
SrsAutoFree(SrsCommonMessage, msg);
if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
srs_info("ignore all message except amf0/amf3 command.");

View file

@ -40,7 +40,7 @@ class SrsResponse;
class SrsSource;
class SrsRefer;
class SrsConsumer;
class SrsMessage;
class SrsCommonMessage;
class SrsStSocket;
#ifdef SRS_AUTO_HTTP_CALLBACK
class SrsHttpHooks;
@ -101,9 +101,9 @@ private:
virtual int fmle_publishing(SrsSource* source);
virtual int flash_publishing(SrsSource* source);
virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd);
virtual int handle_publish_message(SrsSource* source, SrsMessage* msg, bool is_fmle, bool vhost_is_edge);
virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge);
virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
virtual void change_mw_sleep(int sleep_ms);
private:
virtual int check_edge_token_traverse_auth();

View file

@ -208,7 +208,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
return ret;
}
int SrsMessageQueue::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
int ret = ERROR_SUCCESS;
@ -951,7 +951,7 @@ bool SrsSource::can_publish()
return _can_publish;
}
int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata)
int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
@ -1072,7 +1072,7 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata)
return ret;
}
int SrsSource::on_audio(SrsMessage* __audio)
int SrsSource::on_audio(SrsCommonMessage* __audio)
{
int ret = ERROR_SUCCESS;
@ -1183,7 +1183,7 @@ int SrsSource::on_audio(SrsMessage* __audio)
return ret;
}
int SrsSource::on_video(SrsMessage* __video)
int SrsSource::on_video(SrsCommonMessage* __video)
{
int ret = ERROR_SUCCESS;
@ -1287,7 +1287,7 @@ int SrsSource::on_video(SrsMessage* __video)
return ret;
}
int SrsSource::on_aggregate(SrsMessage* msg)
int SrsSource::on_aggregate(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -1349,7 +1349,7 @@ int SrsSource::on_aggregate(SrsMessage* msg)
// to common message.
SrsCommonMessage __o;
SrsMessage& o = __o;
SrsCommonMessage& o = __o;
o.header.message_type = type;
o.header.payload_length = data_size;
@ -1570,7 +1570,7 @@ int SrsSource::on_edge_start_publish()
return publish_edge->on_client_publish();
}
int SrsSource::on_edge_proxy_publish(SrsMessage* msg)
int SrsSource::on_edge_proxy_publish(SrsCommonMessage* msg)
{
return publish_edge->on_proxy_publish(msg);
}

View file

@ -41,7 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsPlayEdge;
class SrsPublishEdge;
class SrsSource;
class SrsMessage;
class SrsCommonMessage;
class SrsOnMetaDataPacket;
class SrsSharedPtrMessage;
class SrsForwarder;
@ -139,11 +139,11 @@ public:
virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
/**
* get packets in consumer queue.
* @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
* @pmsgs SrsCommonMessages*[], used to store the msgs, user must alloc it.
* @count the count in array, output param.
* @max_count the max count to dequeue, must be positive.
*/
virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count);
virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
private:
/**
* remove a gop from the front.
@ -395,10 +395,10 @@ public:
// logic data methods
public:
virtual bool can_publish();
virtual int on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsMessage* audio);
virtual int on_video(SrsMessage* video);
virtual int on_aggregate(SrsMessage* msg);
virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsCommonMessage* audio);
virtual int on_video(SrsCommonMessage* video);
virtual int on_aggregate(SrsCommonMessage* msg);
/**
* the pre-publish is we are very sure we are
* trying to publish stream, please lock the resource,
@ -425,7 +425,7 @@ public:
// for edge, when publish edge stream, check the state
virtual int on_edge_start_publish();
// for edge, proxy the publish
virtual int on_edge_proxy_publish(SrsMessage* msg);
virtual int on_edge_proxy_publish(SrsCommonMessage* msg);
// for edge, proxy stop publish
virtual void on_edge_proxy_unpublish();
private: