1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #913, source support complex error

This commit is contained in:
winlin 2017-09-23 22:12:33 +08:00
parent abcaba33ee
commit 9802dc326e
24 changed files with 1029 additions and 1437 deletions

View file

@ -189,11 +189,10 @@ srs_error_t SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest
srs_error_t SrsEdgeIngester::start()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if ((ret = source->on_publish()) != ERROR_SUCCESS) {
return srs_error_new(ret, "notify source");
if ((err = source->on_publish()) != srs_success) {
return srs_error_wrap(err, "notify source");
}
srs_freep(trd);
@ -262,8 +261,8 @@ srs_error_t SrsEdgeIngester::do_cycle()
// reset the redirect to empty, for maybe the origin changed.
redirect = "";
if ((ret = source->on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) {
return srs_error_new(ret, "on source id changed");
if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
return srs_error_wrap(err, "on source id changed");
}
if ((ret = upstream->connect(req, lb)) != ERROR_SUCCESS) {
@ -274,17 +273,17 @@ srs_error_t SrsEdgeIngester::do_cycle()
return srs_error_new(ret, "notify edge play");
}
ret = ingest();
err = ingest();
// retry for rtmp 302 immediately.
if (ret == ERROR_CONTROL_REDIRECT) {
ret = ERROR_SUCCESS;
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
srs_error_reset(err);
continue;
}
if (srs_is_client_gracefully_close(ret)) {
srs_warn("origin disconnected, retry. ret=%d", ret);
ret = ERROR_SUCCESS;
if (srs_is_client_gracefully_close(err)) {
srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
}
break;
}
@ -292,9 +291,10 @@ srs_error_t SrsEdgeIngester::do_cycle()
return srs_error_new(ret, "cycle");
}
int SrsEdgeIngester::ingest()
srs_error_t SrsEdgeIngester::ingest()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
@ -305,10 +305,7 @@ int SrsEdgeIngester::ingest()
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
return srs_error_wrap(err, "thread quit");
}
pprint->elapse();
@ -321,82 +318,71 @@ int SrsEdgeIngester::ingest()
// read from client.
SrsCommonMessage* msg = NULL;
if ((ret = upstream->recv_message(&msg)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("pull origin server message failed. ret=%d", ret);
}
return ret;
return srs_error_new(ret, "recv message");
}
srs_verbose("edge loop recv message. ret=%d", ret);
srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg);
if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) {
return ret;
if ((err = process_publish_message(msg)) != srs_success) {
return srs_error_wrap(err, "process message");
}
}
return ret;
return err;
}
int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// process audio packet
if (msg->header.is_audio()) {
if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("source process audio message failed. ret=%d", ret);
return ret;
if ((err = source->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "source consume audio");
}
}
// process video packet
if (msg->header.is_video()) {
if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
srs_error("source process video message failed. ret=%d", ret);
return ret;
if ((err = source->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "source consume video");
}
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
srs_error("source process aggregate message failed. ret=%d", ret);
return ret;
if ((err = source->on_aggregate(msg)) != srs_success) {
return srs_error_wrap(err, "source consume aggregate");
}
return ret;
return err;
}
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((ret = upstream->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "decode message");
}
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
srs_error("source process onMetaData message failed. ret=%d", ret);
return ret;
if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
return srs_error_wrap(err, "source consume metadata");
}
srs_info("process onMetaData message success.");
return ret;
return err;
}
srs_info("ignore AMF0/AMF3 data message.");
return ret;
return err;
}
// call messages, for example, reject, redirect.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
SrsPacket* pkt = NULL;
if ((ret = upstream->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode call message failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "decode message");
}
SrsAutoFree(SrsPacket, pkt);
@ -404,35 +390,33 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
if (dynamic_cast<SrsCallPacket*>(pkt)) {
SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
if (!call->arguments->is_object()) {
return ret;
return err;
}
SrsAmf0Any* prop = NULL;
SrsAmf0Object* evt = call->arguments->to_object();
if ((prop = evt->ensure_property_string("level")) == NULL) {
return ret;
return err;
} else if (prop->to_str() != StatusLevelError) {
return ret;
return err;
}
if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) {
return ret;
return err;
}
SrsAmf0Object* ex = prop->to_object();
if ((prop = ex->ensure_property_string("redirect")) == NULL) {
return ret;
return err;
}
redirect = prop->to_str();
ret = ERROR_CONTROL_REDIRECT;
srs_info("RTMP 302 redirect to %s, ret=%d", redirect.c_str(), ret);
return ret;
return srs_error_new(ERROR_CONTROL_REDIRECT, "RTMP 302 redirect to %s", redirect.c_str());
}
}
return ret;
return err;
}
SrsEdgeForwarder::SrsEdgeForwarder()
@ -591,8 +575,8 @@ srs_error_t SrsEdgeForwarder::do_cycle()
// forward all messages.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
return srs_error_new(ret, "queue dumps packets");
if ((err = queue->dump_packets(msgs.max, msgs.msgs, count)) != srs_success) {
return srs_error_wrap(err, "queue dumps packets");
}
pprint->elapse();
@ -617,13 +601,13 @@ srs_error_t SrsEdgeForwarder::do_cycle()
return err;
}
int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if ((ret = send_error_code) != ERROR_SUCCESS) {
srs_error("publish edge proxy thread send error, ret=%d", ret);
return ret;
return srs_error_new(ret, "edge forwarder");
}
// the msg is auto free by source,
@ -631,24 +615,21 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
if (msg->size <= 0
|| msg->header.is_set_chunk_size()
|| msg->header.is_window_ackledgement_size()
|| msg->header.is_ackledgement()
) {
return ret;
|| msg->header.is_ackledgement()) {
return err;
}
SrsSharedPtrMessage copy;
if ((ret = copy.create(msg)) != ERROR_SUCCESS) {
srs_error("initialize the msg failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "create message");
}
srs_verbose("initialize shared ptr msg success.");
copy.stream_id = sdk->sid();
if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) {
srs_error("enqueue edge publish msg failed. ret=%d", ret);
if ((err = queue->enqueue(copy.copy())) != srs_success) {
return srs_error_wrap(err, "enqueue message");
}
return ret;
return err;
}
SrsPlayEdge::SrsPlayEdge()
@ -761,17 +742,13 @@ bool SrsPublishEdge::can_publish()
return state != SrsEdgeStatePublish;
}
int SrsPublishEdge::on_client_publish()
srs_error_t SrsPublishEdge::on_client_publish()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// error when not init state.
if (state != SrsEdgeStateInit) {
ret = ERROR_RTMP_EDGE_PUBLISH_STATE;
srs_error("invalid state for client to publish stream on edge. "
"state=%d, ret=%d", state, ret);
return ret;
return srs_error_new(ERROR_RTMP_EDGE_PUBLISH_STATE, "invalid state");
}
// @see https://github.com/ossrs/srs/issues/180
@ -786,22 +763,18 @@ int SrsPublishEdge::on_client_publish()
// start to forward stream to origin.
err = forwarder->start();
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
// @see https://github.com/ossrs/srs/issues/180
// when failed, revert to init
if (ret != ERROR_SUCCESS) {
if (err != srs_success) {
SrsEdgeState pstate = state;
state = SrsEdgeStateInit;
srs_trace("edge revert from %d to state %d (push). ret=%d", pstate, state, ret);
srs_trace("edge revert from %d to state %d (push), error %s", pstate, state, srs_error_desc(err).c_str());
}
return ret;
return err;
}
int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg)
srs_error_t SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg)
{
return forwarder->proxy(msg);
}