1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

add publish edge framework

This commit is contained in:
winlin 2014-04-27 09:29:37 +08:00
parent 270b1270af
commit ec96072472
6 changed files with 365 additions and 22 deletions

View file

@ -67,7 +67,7 @@ SrsEdgeIngester::~SrsEdgeIngester()
srs_freep(pthread);
}
int SrsEdgeIngester::initialize(SrsSource* source, SrsEdge* edge, SrsRequest* req)
int SrsEdgeIngester::initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
@ -306,19 +306,226 @@ int SrsEdgeIngester::connect_server()
return ret;
}
SrsEdge::SrsEdge()
SrsEdgeForwarder::SrsEdgeForwarder()
{
io = NULL;
client = NULL;
_edge = NULL;
_req = NULL;
origin_index = 0;
stream_id = 0;
stfd = NULL;
pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US);
}
SrsEdgeForwarder::~SrsEdgeForwarder()
{
stop();
srs_freep(pthread);
}
int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
_source = source;
_edge = edge;
_req = req;
return ret;
}
int SrsEdgeForwarder::start()
{
return pthread->start();
}
void SrsEdgeForwarder::stop()
{
pthread->stop();
close_underlayer_socket();
srs_freep(client);
srs_freep(io);
}
int SrsEdgeForwarder::cycle()
{
int ret = ERROR_SUCCESS;
if ((ret = connect_server()) != ERROR_SUCCESS) {
return ret;
}
srs_assert(client);
client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
client->set_send_timeout(SRS_SEND_TIMEOUT_US);
SrsRequest* req = _req;
if ((ret = client->handshake()) != ERROR_SUCCESS) {
srs_error("handshake with server failed. ret=%d", ret);
return ret;
}
if ((ret = client->connect_app(req->app, req->tcUrl)) != ERROR_SUCCESS) {
srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret);
return ret;
}
if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
return ret;
}
if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d",
req->stream.c_str(), stream_id, ret);
return ret;
}
if ((ret = _source->on_publish()) != ERROR_SUCCESS) {
srs_error("edge ingester play stream then publish to edge failed. ret=%d", ret);
return ret;
}
if ((ret = _edge->on_forward_publish()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = forward()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsEdgeForwarder::forward()
{
int ret = ERROR_SUCCESS;
client->set_recv_timeout(SRS_EDGE_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_STAGE_EDGE);
while (pthread->can_loop()) {
// switch to other st-threads.
st_usleep(0);
pithy_print.elapse();
// pithy print
if (pithy_print.can_print()) {
srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
}
// read from client.
SrsCommonMessage* msg = NULL;
if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv origin server message failed. ret=%d", ret);
return ret;
}
srs_verbose("edge loop recv message. ret=%d", ret);
srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg, false);
}
return ret;
}
void SrsEdgeForwarder::close_underlayer_socket()
{
srs_close_stfd(stfd);
}
int SrsEdgeForwarder::connect_server()
{
int ret = ERROR_SUCCESS;
// reopen
close_underlayer_socket();
// TODO: FIXME: support reload
SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);
srs_assert(conf);
// select the origin.
std::string server = conf->args.at(origin_index % conf->args.size());
origin_index = (origin_index + 1) % conf->args.size();
std::string s_port = RTMP_DEFAULT_PORT;
int port = ::atoi(RTMP_DEFAULT_PORT);
size_t pos = server.find(":");
if (pos != std::string::npos) {
s_port = server.substr(pos + 1);
server = server.substr(0, pos);
port = ::atoi(s_port.c_str());
}
// open socket.
srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d",
_req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);
// TODO: FIXME: extract utility method
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
ret = ERROR_SOCKET_CREATE;
srs_error("create socket error. ret=%d", ret);
return ret;
}
srs_assert(!stfd);
stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
srs_freep(client);
srs_freep(io);
io = new SrsSocket(stfd);
client = new SrsRtmpClient(io);
// connect to server.
std::string ip = srs_dns_resolve(server);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
return ret;
}
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
ret = ERROR_ST_CONNECT;
srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
return ret;
}
SrsPlayEdge::SrsPlayEdge()
{
state = SrsEdgeStateInit;
user_state = SrsEdgeUserStateInit;
ingester = new SrsEdgeIngester();
}
SrsEdge::~SrsEdge()
SrsPlayEdge::~SrsPlayEdge()
{
srs_freep(ingester);
}
int SrsEdge::initialize(SrsSource* source, SrsRequest* req)
int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
@ -329,7 +536,7 @@ int SrsEdge::initialize(SrsSource* source, SrsRequest* req)
return ret;
}
int SrsEdge::on_client_play()
int SrsPlayEdge::on_client_play()
{
int ret = ERROR_SUCCESS;
@ -350,7 +557,7 @@ int SrsEdge::on_client_play()
return ret;
}
void SrsEdge::on_all_client_stop()
void SrsPlayEdge::on_all_client_stop()
{
// when all client disconnected,
// and edge is ingesting origin stream, abort it.
@ -365,7 +572,7 @@ void SrsEdge::on_all_client_stop()
}
}
int SrsEdge::on_ingest_play()
int SrsPlayEdge::on_ingest_play()
{
int ret = ERROR_SUCCESS;
@ -382,3 +589,52 @@ int SrsEdge::on_ingest_play()
return ret;
}
SrsPublishEdge::SrsPublishEdge()
{
state = SrsEdgeStateInit;
user_state = SrsEdgeUserStateInit;
forwarder = new SrsEdgeForwarder();
}
SrsPublishEdge::~SrsPublishEdge()
{
srs_freep(forwarder);
}
int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
if ((ret = forwarder->initialize(source, this, req)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsPublishEdge::on_client_publish()
{
int ret = ERROR_SUCCESS;
// error state.
if (user_state != SrsEdgeUserStateInit) {
ret = ERROR_RTMP_EDGE_PUBLISH_STATE;
srs_error("invalid state for client to play stream on edge. "
"state=%d, user_state=%d, ret=%d", state, user_state, ret);
return ret;
}
// start ingest when init state.
if (state == SrsEdgeStateInit) {
state = SrsEdgeStatePublish;
}
return ret;
}
int SrsPublishEdge::on_forward_publish()
{
int ret = ERROR_SUCCESS;
return ret;
}