1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

client connection no disconnect

This commit is contained in:
zhengfl 2014-10-02 19:45:04 +08:00
parent 9b8f6ff962
commit 1e34d2a5cd
5 changed files with 38 additions and 2 deletions

23
trunk/src/app/srs_app_recv_thread.cpp Normal file → Executable file
View file

@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
#include <srs_core_performance.hpp> #include <srs_core_performance.hpp>
#include <srs_app_config.hpp> #include <srs_app_config.hpp>
#include <srs_app_source.hpp>
using namespace std; using namespace std;
@ -138,11 +139,12 @@ SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms)
{ {
rtmp = rtmp_sdk; rtmp = rtmp_sdk;
recv_error_code = ERROR_SUCCESS; recv_error_code = ERROR_SUCCESS;
_consumer = NULL;
} }
SrsQueueRecvThread::~SrsQueueRecvThread() SrsQueueRecvThread::~SrsQueueRecvThread()
{ {
trd.stop(); stop();
// clear all messages. // clear all messages.
std::vector<SrsCommonMessage*>::iterator it; std::vector<SrsCommonMessage*>::iterator it;
@ -160,6 +162,7 @@ int SrsQueueRecvThread::start()
void SrsQueueRecvThread::stop() void SrsQueueRecvThread::stop()
{ {
_consumer = NULL;
trd.stop(); trd.stop();
} }
@ -195,7 +198,13 @@ bool SrsQueueRecvThread::can_handle()
// for the message may cause the thread to stop, // for the message may cause the thread to stop,
// when stop, the thread is freed, so the messages // when stop, the thread is freed, so the messages
// are dropped. // are dropped.
return empty(); bool e = empty();
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (_consumer && !e) {
_consumer->on_dispose();
}
#endif
return e;
} }
int SrsQueueRecvThread::handle(SrsCommonMessage* msg) int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
@ -209,6 +218,11 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
void SrsQueueRecvThread::on_recv_error(int ret) void SrsQueueRecvThread::on_recv_error(int ret)
{ {
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (_consumer) {
_consumer->on_dispose();
}
#endif
recv_error_code = ret; recv_error_code = ret;
} }
@ -226,6 +240,11 @@ void SrsQueueRecvThread::on_thread_stop()
rtmp->set_auto_response(true); rtmp->set_auto_response(true);
} }
void SrsQueueRecvThread::set_consumer(SrsConsumer *consumer)
{
_consumer = consumer;
}
SrsPublishRecvThread::SrsPublishRecvThread( SrsPublishRecvThread::SrsPublishRecvThread(
SrsRtmpServer* rtmp_sdk, SrsRtmpServer* rtmp_sdk,
SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRequest* _req, int mr_sock_fd, int timeout_ms,

4
trunk/src/app/srs_app_recv_thread.hpp Normal file → Executable file
View file

@ -42,6 +42,7 @@ class SrsCommonMessage;
class SrsRtmpConn; class SrsRtmpConn;
class SrsSource; class SrsSource;
class SrsRequest; class SrsRequest;
class SrsConsumer;
/** /**
* for the recv thread to handle the message. * for the recv thread to handle the message.
@ -112,6 +113,7 @@ private:
SrsRtmpServer* rtmp; SrsRtmpServer* rtmp;
// the recv thread error code. // the recv thread error code.
int recv_error_code; int recv_error_code;
SrsConsumer *_consumer;
public: public:
SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms); SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms);
virtual ~SrsQueueRecvThread(); virtual ~SrsQueueRecvThread();
@ -130,6 +132,8 @@ public:
public: public:
virtual void on_thread_start(); virtual void on_thread_start();
virtual void on_thread_stop(); virtual void on_thread_stop();
public:
virtual void set_consumer(SrsConsumer *consumer);
}; };
/** /**

1
trunk/src/app/srs_app_rtmp_conn.cpp Normal file → Executable file
View file

@ -595,6 +595,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
// when mw_sleep changed, resize the socket send buffer. // when mw_sleep changed, resize the socket send buffer.
mw_enabled = true; mw_enabled = true;
change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost)); change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
trd->set_consumer(consumer);
while (true) { while (true) {
// to use isolate thread to recv, can improve about 33% performance. // to use isolate thread to recv, can improve about 33% performance.

8
trunk/src/app/srs_app_source.cpp Normal file → Executable file
View file

@ -515,6 +515,14 @@ void SrsConsumer::wait(int nb_msgs, int duration)
// use cond block wait for high performance mode. // use cond block wait for high performance mode.
st_cond_wait(mw_wait); st_cond_wait(mw_wait);
} }
void SrsConsumer::on_dispose()
{
if (mw_waiting) {
st_cond_signal(mw_wait);
mw_waiting = false;
}
}
#endif #endif
int SrsConsumer::on_play_client_pause(bool is_pause) int SrsConsumer::on_play_client_pause(bool is_pause)

4
trunk/src/app/srs_app_source.hpp Normal file → Executable file
View file

@ -246,6 +246,10 @@ public:
* @param duration the messgae duration to wait. * @param duration the messgae duration to wait.
*/ */
virtual void wait(int nb_msgs, int duration); virtual void wait(int nb_msgs, int duration);
/**
* when waiting, a message incomming, we rouse it
*/
virtual void on_dispose();
#endif #endif
/** /**
* when client send the pause message. * when client send the pause message.