1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For 2034, GB28181: Support transport over TCP

This commit is contained in:
yinjiaoyuan 2020-11-15 22:50:59 +08:00 committed by winlin
parent 751dab56d8
commit fe65c7bf84
12 changed files with 807 additions and 46 deletions

View file

@ -472,6 +472,338 @@ srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet_jitter(const sockaddr* from,
return err;
}
//SrsGb28181TcpPsRtpProcessor
SrsGb28181TcpPsRtpProcessor::SrsGb28181TcpPsRtpProcessor(SrsGb28181Config* c, std::string id)
{
config = c;
pprint = SrsPithyPrint::create_caster();
channel_id = id;
}
SrsGb28181TcpPsRtpProcessor::~SrsGb28181TcpPsRtpProcessor()
{
dispose();
srs_freep(pprint);
}
void SrsGb28181TcpPsRtpProcessor::dispose()
{
map<std::string, SrsPsRtpPacket*>::iterator it2;
for (it2 = cache_ps_rtp_packet.begin(); it2 != cache_ps_rtp_packet.end(); ++it2) {
srs_freep(it2->second);
}
cache_ps_rtp_packet.clear();
clear_pre_packet();
return;
}
void SrsGb28181TcpPsRtpProcessor::clear_pre_packet()
{
map<std::string, SrsPsRtpPacket*>::iterator it;
for (it = pre_packet.begin(); it != pre_packet.end(); ++it) {
srs_freep(it->second);
}
pre_packet.clear();
}
srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp(char* buf, int nb_buf, std::string ip, int port)
{
srs_error_t err = srs_success;
if (config->jitterbuffer_enable) {
err = on_rtp_packet_jitter(buf, nb_buf, ip, port);
if (err != srs_success) {
srs_warn("SrsGb28181TcpPsRtpProcessor::on_rtp on_rtp_packet_jitter err");
}
}
else {
return on_rtp_packet(buf, nb_buf, ip, port);
}
return err;
}
srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp_packet(char* buf, int nb_buf, std::string ip, int port)
{
srs_error_t err = srs_success;
bool completed = false;
pprint->elapse();
char address_string[64] = {0};
char port_string[16] = {0};
/*if (getnameinfo(from, fromlen,
(char*)&address_string, sizeof(address_string),
(char*)&port_string, sizeof(port_string),
NI_NUMERICHOST | NI_NUMERICSERV)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address");
}*/
//itoa(port, port_string, 10);
int peer_port = port;// atoi(port_string);
if (true) {
SrsBuffer stream(buf, nb_buf);
SrsPsRtpPacket pkt;
if ((err = pkt.decode(&stream)) != srs_success) {
return srs_error_wrap(err, "ps rtp decode error");
}
//TODO: fixme: the same device uses the same SSRC to send with different local ports
std::stringstream ss;
ss << pkt.ssrc << ":" << pkt.timestamp << ":" << port;// port_string;
std::string pkt_key = ss.str();
std::stringstream ss2;
ss2 << pkt.ssrc << ":" << port_string;
std::string pre_pkt_key = ss2.str();
if (pre_packet.find(pre_pkt_key) == pre_packet.end()) {
pre_packet[pre_pkt_key] = new SrsPsRtpPacket();
pre_packet[pre_pkt_key]->copy(&pkt);
}
//cache pkt by ssrc and timestamp
if (cache_ps_rtp_packet.find(pkt_key) == cache_ps_rtp_packet.end()) {
cache_ps_rtp_packet[pkt_key] = new SrsPsRtpPacket();
}
//get previous timestamp by ssrc
uint32_t pre_timestamp = pre_packet[pre_pkt_key]->timestamp;
uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number;
//TODO: check sequence number out of order
//it may be out of order, or multiple streaming ssrc are the same
if (((pre_sequence_number + 1) % 65536) != pkt.sequence_number &&
pre_sequence_number != pkt.sequence_number) {
srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)",
pkt.ssrc, pre_sequence_number, pkt.sequence_number, ip.c_str(), port_string);
//return err;
}
//copy header to cache
cache_ps_rtp_packet[pkt_key]->copy(&pkt);
//accumulate one frame of data, to payload cache
cache_ps_rtp_packet[pkt_key]->payload->append(pkt.payload);
//detect whether it is a completed frame
if (pkt.marker) {// rtp maker is true, is a completed frame
completed = true;
}
else if (pre_timestamp != pkt.timestamp) {
//current timestamp is different from previous timestamp
//previous timestamp, is a completed frame
std::stringstream ss;
ss << pkt.ssrc << ":" << pre_timestamp << ":" << port_string;
pkt_key = ss.str();
if (cache_ps_rtp_packet.find(pkt_key) != cache_ps_rtp_packet.end()) {
completed = true;
}
}
if (pprint->can_print()) {
srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB",
channel_id.c_str(), ip.c_str(), peer_port, nb_buf, pprint->age(), pkt.version,
pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc,
pkt.payload->length()
);
}
//current packet becomes previous packet
srs_freep(pre_packet[pre_pkt_key]);
pre_packet[pre_pkt_key] = new SrsPsRtpPacket();
pre_packet[pre_pkt_key]->copy(&pkt);;
if (!completed) {
return err;
}
//process completed frame data
//clear processed one ps frame
//on completed frame data rtp packet in muxer enqueue
map<std::string, SrsPsRtpPacket*>::iterator key = cache_ps_rtp_packet.find(pkt_key);
if (key != cache_ps_rtp_packet.end())
{
SrsGb28181RtmpMuxer* muxer = NULL;
//First, search according to the channel_id. Otherwise, search according to the SSRC.
//Some channel_id are created by RTP pool, which are different ports.
//No channel_id are created by multiplexing ports, which are the same port
if (!channel_id.empty()) {
muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id);
}
else {
muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(pkt.ssrc);
}
//auto crate channel
if (!muxer && config->auto_create_channel) {
//auto create channel generated id
std::stringstream ss, ss1;
ss << "chid" << pkt.ssrc;
std::string tmp_id = ss.str();
SrsGb28181StreamChannel channel;
channel.set_channel_id(tmp_id);
channel.set_port_mode(RTP_PORT_MODE_FIXED);
channel.set_ssrc(pkt.ssrc);
srs_error_t err2 = srs_success;
if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success) {
srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str());
srs_error_reset(err2);
};
muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id);
}
if (muxer) {
//TODO: fixme: the same device uses the same SSRC to send with different local ports
//record the first peer port
muxer->set_channel_peer_port(peer_port);
muxer->set_channel_peer_ip(address_string);
//not the first peer port's non processing
if (muxer->channel_peer_port() != peer_port) {
srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d",
muxer->get_channel_id().c_str(), pkt.ssrc, muxer->channel_peer_port(), peer_port);
srs_freep(key->second);
}
else {
//put it in queue, wait for consumer to process, and then free
muxer->ps_packet_enqueue(key->second);
}
}
else {
//no consumer process it, discarded
srs_freep(key->second);
}
cache_ps_rtp_packet.erase(pkt_key);
}
}
return err;
}
SrsGb28181RtmpMuxer* SrsGb28181TcpPsRtpProcessor::create_rtmpmuxer(std::string channel_id, uint32_t ssrc)
{
if (true) {
SrsGb28181RtmpMuxer* muxer = NULL;
//First, search according to the channel_id. Otherwise, search according to the SSRC.
//Some channel_id are created by RTP pool, which are different ports.
//No channel_id are created by multiplexing ports, which are the same port
if (!channel_id.empty()) {
muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id);
}
else {
muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(ssrc);
}
//auto crate channel
if (!muxer && config->auto_create_channel) {
//auto create channel generated id
std::stringstream ss, ss1;
ss << "chid" << ssrc;
std::string tmp_id = ss.str();
SrsGb28181StreamChannel channel;
channel.set_channel_id(tmp_id);
channel.set_port_mode(RTP_PORT_MODE_FIXED);
channel.set_ssrc(ssrc);
srs_error_t err2 = srs_success;
if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success) {
srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str());
srs_error_reset(err2);
};
muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id);
}
return muxer;
}//end if FoundFrame
}
srs_error_t SrsGb28181TcpPsRtpProcessor::rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc,
int peer_port, std::string address_string, SrsPsRtpPacket *pkt)
{
srs_error_t err = srs_success;
if (!muxer)
return err;
if (muxer) {
//TODO: fixme: the same device uses the same SSRC to send with different local ports
//record the first peer port
muxer->set_channel_peer_port(peer_port);
muxer->set_channel_peer_ip(address_string);
//not the first peer port's non processing
if (muxer->channel_peer_port() != peer_port) {
srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d",
muxer->get_channel_id().c_str(), ssrc, muxer->channel_peer_port(), peer_port);
}
else {
//muxer->ps_packet_enqueue(pkt);
muxer->insert_jitterbuffer(pkt);
}//end if (muxer->channel_peer_port() != peer_port)
}//end if (muxer)
return err;
}
srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter(char* buf, int nb_buf, std::string ip, int port)
{
srs_error_t err = srs_success;
bool completed = false;
pprint->elapse();
char address_string[64] = {0};
char port_string[16] = {0};
/*if (getnameinfo(from, fromlen,
(char*)&address_string, sizeof(address_string),
(char*)&port_string, sizeof(port_string),
NI_NUMERICHOST | NI_NUMERICSERV)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address");
}*/
//itoa(port, port_string, 10);
int peer_port = port;// atoi(port_string);
if (true) {
SrsBuffer stream(buf, nb_buf);
SrsPsRtpPacket *pkt = new SrsPsRtpPacket();;
if ((err = pkt->decode(&stream)) != srs_success) {
srs_freep(pkt);
return srs_error_wrap(err, "ps rtp decode error");
}
std::stringstream ss3;
ss3 << pkt->ssrc << ":" << port;// port_string;
std::string jitter_key = ss3.str();
pkt->completed = pkt->marker;
if (pprint->can_print()) {
srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB",
channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt->version,
pkt->payload_type, pkt->sequence_number, pkt->timestamp, pkt->ssrc,
pkt->payload->length()
);
}
SrsGb28181RtmpMuxer *muxer = create_rtmpmuxer(channel_id, pkt->ssrc);
if (muxer) {
rtmpmuxer_enqueue_data(muxer, pkt->ssrc, peer_port, ip, pkt);
}
SrsAutoFree(SrsPsRtpPacket, pkt);
}
return err;
}
//ISrsPsStreamHander ps stream raw video/audio hander interface
ISrsPsStreamHander::ISrsPsStreamHander()
{
@ -901,6 +1233,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c)
host = get_host_candidate_ips(c);
output = _srs_config->get_stream_caster_output(c);
rtp_mux_port = _srs_config->get_stream_caster_listen(c);
rtp_mux_tcp_enable = _srs_config->get_stream_caster_tcp_enable(c);
rtp_port_min = _srs_config->get_stream_caster_rtp_port_min(c);
rtp_port_max = _srs_config->get_stream_caster_rtp_port_max(c);
rtp_idle_timeout = _srs_config->get_stream_caster_gb28181_rtp_idle_timeout(c);
@ -2042,13 +2375,15 @@ void SrsGb28181Manger::rtmpmuxer_unmap_by_ssrc(uint32_t ssrc)
void SrsGb28181Manger::destroy()
{
//destory ps rtp listen
std::map<uint32_t, SrsPsRtpListener*>::iterator it;
for (it = rtp_pool.begin(); it != rtp_pool.end(); ++it) {
SrsPsRtpListener* listener = it->second;
srs_freep(listener);
}
rtp_pool.clear();
if (!config->rtp_mux_tcp_enable) {
//destory ps rtp listen
std::map<uint32_t, SrsPsRtpListener*>::iterator it;
for (it = rtp_pool.begin(); it != rtp_pool.end(); ++it) {
SrsPsRtpListener* listener = it->second;
srs_freep(listener);
}
rtp_pool.clear();
}
//destory gb28181 muxer
std::map<std::string, SrsGb28181RtmpMuxer*>::iterator it2;
@ -2092,17 +2427,19 @@ srs_error_t SrsGb28181Manger::start_ps_rtp_listen(std::string id, int port)
return srs_error_wrap(err, "start rtp listen port rtmp muxer is null");
}
if (rtp_pool.find(port) == rtp_pool.end())
{
SrsPsRtpListener* rtp = new SrsPsRtpListener(this->config, port, id);
rtp_pool[port] = rtp;
if ((err = rtp_pool[port]->listen()) != srs_success) {
stop_rtp_listen(id);
return srs_error_wrap(err, "rtp listen");
}
if (!config->rtp_mux_tcp_enable) {
if (rtp_pool.find(port) == rtp_pool.end())
{
SrsPsRtpListener* rtp = new SrsPsRtpListener(this->config, port, id);
rtp_pool[port] = rtp;
if ((err = rtp_pool[port]->listen()) != srs_success) {
stop_rtp_listen(id);
return srs_error_wrap(err, "rtp listen");
}
srs_trace("gb28181: start rtp ps stream over server-port=%d", port);
}
srs_trace("gb28181: start rtp ps stream over server-port=%d", port);
}
}
return err;
}
@ -2122,12 +2459,13 @@ void SrsGb28181Manger::stop_rtp_listen(std::string id)
return;
}
map<uint32_t, SrsPsRtpListener*>::iterator it2 = rtp_pool.find(port);
if (it2 != rtp_pool.end()){
srs_freep(it2->second);
rtp_pool.erase(it2);
}
if (!config->rtp_mux_tcp_enable) {
map<uint32_t, SrsPsRtpListener*>::iterator it2 = rtp_pool.find(port);
if (it2 != rtp_pool.end()) {
srs_freep(it2->second);
rtp_pool.erase(it2);
}
}
free_port(port, port+1);
}
@ -2411,3 +2749,224 @@ srs_error_t SrsGb28181Manger::query_sip_session(std::string id, SrsJsonArray* ar
return sip_service->query_sip_session(id, arr);
}
#define SRS_RTSP_BUFFER 262144
SrsGb28181Conn::SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181TcpPsRtpProcessor *rtp_processor)
{
caster = c;
stfd = fd;
skt = new SrsStSocket();
rtsp = new SrsRtspStack(skt);
trd = new SrsSTCoroutine("rtsp", this);
mbuffer = (char*)malloc(SRS_RTSP_BUFFER);
processor = rtp_processor;
}
SrsGb28181Conn::~SrsGb28181Conn()
{
free(mbuffer);
srs_close_stfd(stfd);
srs_freep(trd);
srs_freep(skt);
srs_freep(rtsp);
}
srs_error_t SrsGb28181Conn::serve()
{
srs_error_t err = srs_success;
if ((err = skt->initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "socket initialize");
}
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "rtsp connection");
}
return err;
}
std::string SrsGb28181Conn::remote_ip()
{
// TODO: FIXME: Implement it.
return "";
}
srs_error_t SrsGb28181Conn::do_cycle()
{
srs_error_t err = srs_success;
// retrieve ip of client.
int fd = srs_netfd_fileno(stfd);
std::string ip = srs_get_peer_ip(fd);
int port = srs_get_peer_port(fd);
if (ip.empty() && !_srs_config->empty_ip_ok()) {
srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd));
}
srs_trace("rtsp: serve %s:%d", ip.c_str(), port);
char* leftData = (char*)malloc(SRS_RTSP_BUFFER);;
uint32_t leftDataLength = 0;
int16_t length = 0;
char* pp = (char*)&length;
char* p = &(mbuffer[0]);
ssize_t nb_read = 0;
int16_t length2;
// consume all rtp data.
while (true) {
if ((err = trd->pull()) != srs_success) {
free(leftData);
return srs_error_wrap(err, "rtsp cycle");
}
//memset(buffer, 0, SRS_RTSP_BUFFER);
nb_read = 0;
if ((err = skt->read(mbuffer + leftDataLength, SRS_RTSP_BUFFER - leftDataLength, &nb_read)) != srs_success) {
free(leftData);
return srs_error_wrap(err, "recv data");
}
nb_read = nb_read + leftDataLength;
length;
pp = (char*)&length;
p = &(mbuffer[0]);
pp[1] = *p++;
pp[0] = *p++;
if (nb_read < (length + 2)) {//Not enough one packet.
leftDataLength = leftDataLength + nb_read;
continue;
}
memset(leftData, 0, SRS_RTSP_BUFFER);
while (length > 0) {
if ((length + 2) == nb_read) {//Only one packet.
nb_read = nb_read - 2;
processor->on_rtp(mbuffer + 2, nb_read, ip, port);
leftDataLength = 0;
break;
}
else { //multi packets.
pp = (char*)&length2;
p = &(mbuffer[length + 2]);
pp[1] = *p++;
pp[0] = *p++;
processor->on_rtp(mbuffer + 2, length, ip, port);
leftDataLength = nb_read - (length + 2);
nb_read = leftDataLength;
memcpy(leftData, mbuffer + length + 2, leftDataLength);
pp = (char*)&length;
p = &(mbuffer[length + 2]);
pp[1] = *p++;
pp[0] = *p++;
if (leftDataLength < (length + 2)) {//Not enough one packet.
memcpy(mbuffer, leftData, leftDataLength);
break;
}
else {
memcpy(mbuffer, leftData, leftDataLength);
}
}
}
}
free(leftData);
return err;
}
srs_error_t SrsGb28181Conn::cycle()
{
// serve the rtsp client.
srs_error_t err = do_cycle();
caster->remove(this);
if (err == srs_success) {
srs_trace("client finished.");
}
else if (srs_is_client_gracefully_close(err)) {
srs_warn("client disconnect peer. code=%d", srs_error_code(err));
srs_freep(err);
}
return err;
}
std::string SrsGb28181Conn::desc()
{
return "GB28181TcpConn";
}
const SrsContextId& SrsGb28181Conn::get_id()
{
return trd->cid();
}
SrsGb28181Caster::SrsGb28181Caster(SrsConfDirective* c)
{
// TODO: FIXME: support reload.
output = _srs_config->get_stream_caster_output(c);
config = new SrsGb28181Config(c);
rtp_processor = new SrsGb28181TcpPsRtpProcessor(config, "");
manager = new SrsResourceManager("GB28181TCP", true);
}
SrsGb28181Caster::~SrsGb28181Caster()
{
std::vector<SrsGb28181Conn*>::iterator it;
for (it = clients.begin(); it != clients.end(); ++it) {
SrsGb28181Conn* conn = *it;
manager->remove(conn);
}
clients.clear();
srs_freep(manager);
}
srs_error_t SrsGb28181Caster::initialize()
{
srs_error_t err = srs_success;
if ((err = manager->start()) != srs_success) {
return srs_error_wrap(err, "start manager");
}
return err;
}
srs_error_t SrsGb28181Caster::on_tcp_client(srs_netfd_t stfd)
{
srs_error_t err = srs_success;
SrsGb28181Conn* conn = new SrsGb28181Conn(this, stfd, rtp_processor);
if ((err = conn->serve()) != srs_success) {
srs_freep(conn);
return srs_error_wrap(err, "serve conn");
}
clients.push_back(conn);
return err;
}
void SrsGb28181Caster::remove(SrsGb28181Conn* conn)
{
std::vector<SrsGb28181Conn*>::iterator it = find(clients.begin(), clients.end(), conn);
if (it != clients.end()) {
clients.erase(it);
}
srs_info("rtsp: remove connection from caster.");
manager->remove(conn);
}