1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge branch '3.0release' into 4.0release

This commit is contained in:
winlin 2020-03-21 22:42:17 +08:00
commit be746da21c
9 changed files with 118 additions and 27 deletions

View file

@ -198,19 +198,32 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
srs_error_t err = srs_success;
SrsStSocket skt;
if ((err = skt.initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "init socket");
}
if ((err = parser->parse_message(&skt, preq)) != srs_success) {
return srs_error_wrap(err, "parse message");
// Check user interrupt by interval.
skt.set_recv_timeout(3 * SRS_UTIME_SECONDS);
// drop all request body.
char body[4096];
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "timeout");
}
if ((err = skt.read(body, 4096, NULL)) != srs_success) {
// Because we use timeout to check trd state, so we should ignore any timeout.
if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) {
srs_freep(err);
continue;
}
return srs_error_wrap(err, "read response");
}
}
// Attach owner connection to message.
SrsHttpMessage* hreq = (SrsHttpMessage*)(*preq);
hreq->set_connection(this);
return err;
}
@ -219,12 +232,12 @@ srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg)
srs_error_t err = srs_success;
ISrsHttpResponseReader* br = msg->body_reader();
// when not specified the content length, ignore.
if (msg->content_length() == -1) {
return err;
}
// drop all request body.
char body[4096];
while (!br->eof()) {
@ -236,6 +249,11 @@ srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg)
return err;
}
void SrsResponseOnlyHttpConn::expire()
{
SrsHttpConn::expire();
}
SrsHttpServer::SrsHttpServer(SrsServer* svr)
{
server = svr;

View file

@ -101,6 +101,9 @@ public:
virtual srs_error_t pop_message(ISrsHttpMessage** preq);
public:
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg);
public:
// Set connection to expired.
virtual void expire();
};
// The http server, use http stream or static server to serve requests.

View file

@ -592,10 +592,15 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
// Use receive thread to accept the close event to avoid FD leak.
// @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection());
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id(), req, NULL, SrsRtmpConnPlay)) != srs_success) {
if ((err = stat->on_client(_srs_context->get_id(), req, hc, SrsRtmpConnPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}
@ -613,11 +618,6 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
}
SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc);
// Use receive thread to accept the close event to avoid FD leak.
// @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection());
// Set the socket options for transport.
bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);

View file

@ -24,6 +24,6 @@
#ifndef SRS_CORE_VERSION3_HPP
#define SRS_CORE_VERSION3_HPP
#define SRS_VERSION3_REVISION 134
#define SRS_VERSION3_REVISION 137
#endif

View file

@ -336,21 +336,25 @@ srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int s
#endif
// send in a time.
if (size < limits) {
if (size <= limits) {
if ((err = skt->writev(iovs, size, pnwrite)) != srs_success) {
return srs_error_wrap(err, "writev");
}
return err;
}
// send in multiple times.
int cur_iov = 0;
ssize_t nwrite = 0;
while (cur_iov < size) {
int cur_count = srs_min(limits, size - cur_iov);
if ((err = skt->writev(iovs + cur_iov, cur_count, pnwrite)) != srs_success) {
if ((err = skt->writev(iovs + cur_iov, cur_count, &nwrite)) != srs_success) {
return srs_error_wrap(err, "writev");
}
cur_iov += cur_count;
if (pnwrite) {
*pnwrite += nwrite;
}
}
return err;

View file

@ -230,8 +230,6 @@ srs_error_t MockBufferIO::writev(const iovec *iov, int iov_size, ssize_t* nwrite
total += writen;
}
sbytes += total;
if (nwrite) {
*nwrite = total;
}
@ -6412,3 +6410,65 @@ VOID TEST(ProtocolKbpsTest, RAWStatistic)
}
}
VOID TEST(ProtocolKbpsTest, WriteLargeIOVs)
{
srs_error_t err;
if (true) {
iovec iovs[1];
iovs[0].iov_base = (char*)"Hello";
iovs[0].iov_len = 5;
MockBufferIO io;
ssize_t nn = 0;
HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, 1, &nn));
EXPECT_EQ(5, nn);
EXPECT_EQ(5, io.sbytes);
}
if (true) {
iovec iovs[1024];
int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec));
for (int i = 0; i < nn_iovs; i++) {
iovs[i].iov_base = (char*)"Hello";
iovs[i].iov_len = 5;
}
MockBufferIO io;
ssize_t nn = 0;
HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn));
EXPECT_EQ(5 * nn_iovs, nn);
EXPECT_EQ(5 * nn_iovs, io.sbytes);
}
if (true) {
iovec iovs[1025];
int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec));
for (int i = 0; i < nn_iovs; i++) {
iovs[i].iov_base = (char*)"Hello";
iovs[i].iov_len = 5;
}
MockBufferIO io;
ssize_t nn = 0;
HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn));
EXPECT_EQ(5 * nn_iovs, nn);
EXPECT_EQ(5 * nn_iovs, io.sbytes);
}
if (true) {
iovec iovs[4096];
int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec));
for (int i = 0; i < nn_iovs; i++) {
iovs[i].iov_base = (char*)"Hello";
iovs[i].iov_len = 5;
}
MockBufferIO io;
ssize_t nn = 0;
HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn));
EXPECT_EQ(5 * nn_iovs, nn);
EXPECT_EQ(5 * nn_iovs, io.sbytes);
}
}