diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 0f45df751..513a55b87 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -328,7 +328,22 @@ stream_caster { # default: off audio_enable off; # The exposed IP to receive media stream. - host 192.168.1.3; + # * Retrieve server IP automatically, from all network interfaces. + # eth0 Retrieve server IP by specified network interface name. # TODO: Implements it. + # $CANDIDATE Read the IP from ENV variable $EIP, use * if not set, see https://github.com/ossrs/srs/issues/307#issuecomment-599028124 + # x.x.x.x A specified IP address or DNS name, which can be access by client such as Chrome. + # You can specific more than one interface name: + # eth0 eth1 Use network interface eth0 and eth1. # TODO: Implements it. + # Also by IP or DNS names: + # 192.168.1.3 10.1.2.3 rtc.me # TODO: Implements it. + # And by multiple ENV variables: + # $CANDIDATE $EIP # TODO: Implements it. + # default: * + host *; + #The media channel is automatically created according to the received RTP packet, + # and the channel ID is generated according to the RTP SSRC + # channelid format: 'chid[ssrc]' [ssrc] is rtp's ssrc + auto_create_channel off; sip { # Whether enable embeded SIP server. @@ -349,8 +364,6 @@ stream_caster { # The keepalive timeout in seconds. # default: 120 keepalive_timeout 120; - # Whether print SIP logs. - print_sip_message off; # Whether play immediately after registered. # default: on auto_play on; diff --git a/trunk/conf/push.gb28181.conf b/trunk/conf/push.gb28181.conf index a106016c3..0b49b8e77 100644 --- a/trunk/conf/push.gb28181.conf +++ b/trunk/conf/push.gb28181.conf @@ -1,4 +1,4 @@ -# push gb28281 stream to SRS. +# push gb28181 stream to SRS. listen 1935; max_connections 1000; @@ -10,14 +10,20 @@ http_api { listen 1985; } +stats { + network 0; +} + stream_caster { enabled on; caster gb28181; # 转发流到rtmp服务器地址与端口 # TODO: https://github.com/ossrs/srs/pull/1679/files#r400875104 - # [stream] is VideoChannelCodecID(视频通道编码ID) - output 127.0.0.1:1935; + # [stream] is VideoChannelCodecID(视频通道编码ID) for sip + # 自动创建的道通[stream] 是‘chid[ssrc]’ [ssrc]是rtp的ssrc + # [ssrc] rtp中的ssrc + output rtmp://127.0.0.1:1935/live/[stream]; # 接收设备端rtp流的多路复用端口 listen 9000; @@ -52,8 +58,14 @@ stream_caster { # 也就是设备端将媒体发送的地址,如果是服务器是内外网 # 需要写外网地址, # 调用api创建stream session时返回ip地址也是host + # $CANDIDATE 是系统环境变量,从环境变量获取地址,如果没有配置,用* + # *代表指定stats network 的网卡号地址,如果没有配置network,默认则是第0号网卡地址 # TODO: https://github.com/ossrs/srs/pull/1679/files#r400917594 - host 192.168.1.27; + host $CANDIDATE; + + #根据收到ps rtp包自带创建rtmp媒体通道,不需要api接口创建 + #rtmp地址参数[stream] 就是通道id 格式chid[ssrc] + auto_create_channel off; sip { # 是否启用srs内部sip信令 @@ -78,11 +90,6 @@ stream_caster { # 认为设备离线 keepalive_timeout 120; - # 日志打印是否打印sip信息 - # off:不打印 - # on:打印接收或发送sip命令信息 - # TODO: https://github.com/ossrs/srs/pull/1679/files#r400929300 - print_sip_message off; # 注册之后是否自动给设备端发送invite # on: 是 off 不是,需要通过api控制 diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 6868adfed..5829d47d4 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2164,8 +2164,11 @@ srs_error_t SrsConfig::global_to_json(SrsJsonObject* obj) sobj->set(sdir->name, sdir->dumps_arg0_to_str()); } else if (sdir->name == "auto_play") { sobj->set(sdir->name, sdir->dumps_arg0_to_str()); + } else if (sdir->name == "auto_create_channel") { + sobj->set(sdir->name, sdir->dumps_arg0_to_str()); } + } obj->set(dir->name, sobj); } else { @@ -3691,7 +3694,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "listen" && n != "rtp_port_min" && n != "rtp_port_max" && n != "rtp_idle_timeout" && n != "sip" && n != "audio_enable" && n != "wait_keyframe" - && n != "host") { + && n != "host" && n != "auto_create_channel") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal stream_caster.%s", n.c_str()); } @@ -4359,9 +4362,9 @@ srs_utime_t SrsConfig::get_stream_caster_gb28181_rtp_idle_timeout(SrsConfDirecti return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS); } -int SrsConfig::get_stream_caster_gb28181_ack_timeout(SrsConfDirective* conf) +srs_utime_t SrsConfig::get_stream_caster_gb28181_ack_timeout(SrsConfDirective* conf) { - static int DEFAULT = 30; + static srs_utime_t DEFAULT = 30 * SRS_UTIME_SECONDS; if (!conf) { return DEFAULT; @@ -4377,12 +4380,13 @@ int SrsConfig::get_stream_caster_gb28181_ack_timeout(SrsConfDirective* conf) return DEFAULT; } - return ::atoi(conf->arg0().c_str()); + return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS); } -int SrsConfig::get_stream_caster_gb28181_keepalive_timeout(SrsConfDirective* conf) +srs_utime_t SrsConfig::get_stream_caster_gb28181_keepalive_timeout(SrsConfDirective* conf) { - static int DEFAULT = 120; + static srs_utime_t DEFAULT = 120 * SRS_UTIME_SECONDS; + if (!conf) { return DEFAULT; @@ -4398,22 +4402,28 @@ int SrsConfig::get_stream_caster_gb28181_keepalive_timeout(SrsConfDirective* con return DEFAULT; } - return ::atoi(conf->arg0().c_str()); + return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS); } string SrsConfig::get_stream_caster_gb28181_host(SrsConfDirective* conf) { - static string DEFAULT = ""; - - if (!conf) { - return DEFAULT; - } + static string DEFAULT = "*"; conf = conf->get("host"); if (!conf || conf->arg0().empty()) { return DEFAULT; } + string eip = srs_getenv(conf->arg0()); + if (!eip.empty()) { + return eip; + } + + // If configed as ENV, but no ENV set, use default value. + if (srs_string_starts_with(conf->arg0(), "$")) { + return DEFAULT; + } + return conf->arg0(); } @@ -4475,26 +4485,26 @@ bool SrsConfig::get_stream_caster_gb28181_audio_enable(SrsConfDirective* conf) return SRS_CONF_PERFER_FALSE(conf->arg0()); } -bool SrsConfig::get_stream_caster_gb28181_print_sip_message(SrsConfDirective* conf) -{ - static bool DEFAULT = false; +// bool SrsConfig::get_stream_caster_gb28181_print_sip_message(SrsConfDirective* conf) +// { +// static bool DEFAULT = false; - if (!conf) { - return DEFAULT; - } +// if (!conf) { +// return DEFAULT; +// } - conf = conf->get("sip"); - if (!conf) { - return DEFAULT; - } +// conf = conf->get("sip"); +// if (!conf) { +// return DEFAULT; +// } - conf = conf->get("print_sip_message"); - if (!conf || conf->arg0().empty()) { - return DEFAULT; - } +// conf = conf->get("print_sip_message"); +// if (!conf || conf->arg0().empty()) { +// return DEFAULT; +// } - return SRS_CONF_PERFER_FALSE(conf->arg0()); -} +// return SRS_CONF_PERFER_FALSE(conf->arg0()); +// } bool SrsConfig::get_stream_caster_gb28181_wait_keyframe(SrsConfDirective* conf) { @@ -4599,6 +4609,22 @@ bool SrsConfig::get_stream_caster_gb28181_sip_invite_port_fixed(SrsConfDirective } +bool SrsConfig::get_stream_caster_gb28181_auto_create_channel(SrsConfDirective* conf) +{ + static bool DEFAULT = false; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("auto_create_channel"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + int SrsConfig::get_rtc_server_enabled() { SrsConfDirective* conf = root->get("rtc_server"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 3f01c0031..b717a233d 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -503,18 +503,19 @@ public: virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf); virtual srs_utime_t get_stream_caster_gb28181_rtp_idle_timeout(SrsConfDirective* conf); - virtual int get_stream_caster_gb28181_ack_timeout(SrsConfDirective* conf); - virtual int get_stream_caster_gb28181_keepalive_timeout(SrsConfDirective* conf); + virtual srs_utime_t get_stream_caster_gb28181_ack_timeout(SrsConfDirective* conf); + virtual srs_utime_t get_stream_caster_gb28181_keepalive_timeout(SrsConfDirective* conf); virtual bool get_stream_caster_gb28181_audio_enable(SrsConfDirective* conf); virtual std::string get_stream_caster_gb28181_host(SrsConfDirective* conf); virtual std::string get_stream_caster_gb28181_serial(SrsConfDirective* conf); virtual std::string get_stream_caster_gb28181_realm(SrsConfDirective* conf); - virtual bool get_stream_caster_gb28181_print_sip_message(SrsConfDirective* conf); + //virtual bool get_stream_caster_gb28181_print_sip_message(SrsConfDirective* conf); virtual bool get_stream_caster_gb28181_wait_keyframe(SrsConfDirective* conf); virtual bool get_stream_caster_gb28181_sip_enable(SrsConfDirective* conf); virtual bool get_stream_caster_gb28181_sip_auto_play(SrsConfDirective* conf); virtual int get_stream_caster_gb28181_sip_listen(SrsConfDirective* conf); virtual bool get_stream_caster_gb28181_sip_invite_port_fixed(SrsConfDirective* conf); + virtual bool get_stream_caster_gb28181_auto_create_channel(SrsConfDirective* conf); // rtc section public: diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index ea196e61a..c964c9ac9 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -22,11 +22,7 @@ */ #include - -#include #include -#include -#include #include using namespace std; @@ -54,11 +50,10 @@ using namespace std; #include #include - - //#define W_PS_FILE //#define W_VIDEO_FILE //#define W_AUDIO_FILE +//#define W_UNKONW_FILE SrsPsRtpPacket::SrsPsRtpPacket() { @@ -181,9 +176,20 @@ void SrsGb28181PsRtpProcessor::dispose() } cache_ps_rtp_packet.clear(); + clear_pre_packet(); + return; } +void SrsGb28181PsRtpProcessor::clear_pre_packet() +{ + map::iterator it; + for (it = pre_packet.begin(); it != pre_packet.end(); ++it) { + srs_freep(it->second); + } + pre_packet.clear(); +} + srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -231,15 +237,16 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const //get previous timestamp by ssrc uint32_t pre_timestamp = pre_packet[pre_pkt_key]->timestamp; - //uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number; + uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number; //TODO: check sequence number out of order //it may be out of order, or multiple streaming ssrc are the same - // if (pre_sequence_number > pkt.sequence_number){ - // srs_info("gb28281: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, addr=%s,port=%s", - // pkt.ssrc, pre_sequence_number, pkt.sequence_number, address_string, port_string); - // //return err; - // } + if (pre_sequence_number + 1 != pkt.sequence_number && + pre_sequence_number != pkt.sequence_number){ + srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)", + pkt.ssrc, pre_sequence_number, pkt.sequence_number, address_string, port_string); + //return err; + } //copy header to cache cache_ps_rtp_packet[pkt_key]->copy(&pkt); @@ -261,8 +268,9 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const } if (pprint->can_print()) { - srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", - channel_id.c_str(), nb_buf, pprint->age(), pkt.version, pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc, + srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", + channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt.version, + pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc, pkt.payload->length() ); } @@ -291,9 +299,24 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id); }else { muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(pkt.ssrc); - } + //auto crate channel + if (!muxer && config->auto_create_channel){ + //auto create channel generated id + std::stringstream ss, ss1; + ss << "chid" << pkt.ssrc; + std::string tmp_id = ss.str(); + + SrsGb28181StreamChannel channel; + channel.set_channel_id(tmp_id); + channel.set_port_mode(RTP_PORT_MODE_FIXED); + channel.set_ssrc(pkt.ssrc); + _srs_gb28181->create_stream_channel(&channel); + + muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id); + } + if (muxer){ //TODO: fixme: the same device uses the same SSRC to send with different local ports //record the first peer port @@ -305,11 +328,11 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const muxer->get_channel_id().c_str(), pkt.ssrc, muxer->channel_peer_port(), peer_port); srs_freep(key->second); }else { - //put it in queue, wait for conn to process, and then free + //put it in queue, wait for consumer to process, and then free muxer->ps_packet_enqueue(key->second); } }else{ - //no connection process it, discarded + //no consumer process it, discarded srs_freep(key->second); } cache_ps_rtp_packet.erase(pkt_key); @@ -461,7 +484,7 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_ //in a frame of data, pts is obtained from the first PSE packet if (pse_index == 0 && pts_dts_flags > 0) { video_pts = parse_ps_timestamp((unsigned char*)next_ps_pack + 9); - srs_info("gb28181: ps stream video ts=%u pkt_ts=%u", pts, timestamp); + srs_info("gb28181: ps stream video ts=%u pkt_ts=%u", video_pts, timestamp); } pse_index +=1; @@ -544,15 +567,40 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_ } else { - srs_trace("gb28181: client_id %s, unkonw ps data %02x %02x %02x %02x\n", - channel_id.c_str(), next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]); + +#ifdef W_UNKONW_FILE + if (!unknow_fw.is_open()) { + std::string filename = "test_unknow_" + channel_id + ".mpg"; + unknow_fw.open(filename.c_str()); + } + unknow_fw.write(next_ps_pack, incomplete_len, NULL); +#endif + //TODO: fixme unkonw ps data parse + if (next_ps_pack + && next_ps_pack[0] == (char)0x00 + && next_ps_pack[1] == (char)0x00 + && next_ps_pack[2] == (char)0x00 + && next_ps_pack[3] == (char)0x01){ + //dahua's PS header may lose packets. It is sent by an RTP packet of Dahua's PS header + //dahua rtp send format: + //ts=1000 seq=1 mark=false payload= ps header + //ts=1000 seq=2 mark=false payload= video + //ts=1000 seq=3 mark=true payload= video + //ts=1000 seq=4 mark=true payload= audio + incomplete_len = ps_size - complete_len; + complete_len = complete_len + incomplete_len; + } + + srs_trace("gb28181: client_id %s, unkonw ps data (%#x/%u) %02x %02x %02x %02x\n", + channel_id.c_str(), ssrc, timestamp, + next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]); break; } } if (complete_len != ps_size){ - srs_trace("gb28181: client_id %s decode ps packet error! ps_size=%d complete=%d \n", - channel_id.c_str(), ps_size, complete_len); + srs_trace("gb28181: client_id %s decode ps packet error (%#x/%u)! ps_size=%d complete=%d \n", + channel_id.c_str(), ssrc, timestamp, ps_size, complete_len); }else if (hander && video_stream.length() && can_send_ps_av_packet()) { if ((err = hander->on_rtp_video(&video_stream, video_pts)) != srs_success) { return srs_error_wrap(err, "process ps video packet"); @@ -562,12 +610,23 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_ return err; } +static std::string get_host_candidate_ips(SrsConfDirective* c) +{ + string candidate = _srs_config->get_stream_caster_gb28181_host(c); + if (candidate == "*" || candidate == "0.0.0.0") { + std::vector ips = srs_get_local_ips(); + int index = _srs_config->get_stats_network(); + return ips.at(index); + } else { + return candidate; + } +} //Gb28181 Config SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) { // TODO: FIXME: support reload. - host = _srs_config->get_stream_caster_gb28181_host(c); + host = get_host_candidate_ips(c); output = _srs_config->get_stream_caster_output(c); rtp_mux_port = _srs_config->get_stream_caster_listen(c); rtp_port_min = _srs_config->get_stream_caster_rtp_port_min(c); @@ -576,6 +635,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) wait_keyframe = _srs_config->get_stream_caster_gb28181_wait_keyframe(c); audio_enable = _srs_config->get_stream_caster_gb28181_audio_enable(c); + auto_create_channel = _srs_config->get_stream_caster_gb28181_auto_create_channel(c); //sip config sip_enable = _srs_config->get_stream_caster_gb28181_sip_enable(c); @@ -585,7 +645,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) sip_auto_play = _srs_config->get_stream_caster_gb28181_sip_auto_play(c); sip_ack_timeout = _srs_config->get_stream_caster_gb28181_ack_timeout(c); sip_keepalive_timeout = _srs_config->get_stream_caster_gb28181_keepalive_timeout(c); - print_sip_message = _srs_config->get_stream_caster_gb28181_print_sip_message(c); + //print_sip_message = _srs_config->get_stream_caster_gb28181_print_sip_message(c); sip_invite_port_fixed = _srs_config->get_stream_caster_gb28181_sip_invite_port_fixed(c); } @@ -615,14 +675,15 @@ SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bo ps_demixer = new SrsPsStreamDemixer(this, id, a, k); wait_ps_queue = srs_cond_new(); - h264_sps_changed = false; - h264_pps_changed = false; - h264_sps_pps_sent = false; - stream_idle_timeout = -1; recv_stream_time = 0; _rtmp_url = ""; + + h264_sps = ""; + h264_pps = ""; + aac_specific_config = ""; + } SrsGb28181RtmpMuxer::~SrsGb28181RtmpMuxer() @@ -681,6 +742,8 @@ void SrsGb28181RtmpMuxer::set_channel_peer_ip(std::string ip) void SrsGb28181RtmpMuxer::set_channel_peer_port(int port) { if (channel->get_rtp_peer_port() == 0){ + channel->set_recv_time_str(srs_sip_get_utc_date()); + channel->set_recv_time(srs_get_system_time()); channel->set_rtp_peer_port(port); } } @@ -704,6 +767,11 @@ std::string SrsGb28181RtmpMuxer::rtmp_url() return _rtmp_url; } +srs_utime_t SrsGb28181RtmpMuxer::get_recv_stream_time() +{ + return recv_stream_time; +} + void SrsGb28181RtmpMuxer::destroy() { @@ -745,11 +813,25 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() } if (pprint->can_print()) { - srs_trace("gb28181: client id=%s, rtmp muxer is alive", channel_id.c_str()); + srs_trace("gb28181: client id=%s, ssrc=%#x, peer(%s, %d), rtmp muxer is alive", + channel_id.c_str(), channel->get_ssrc(), + channel->get_rtp_peer_ip().c_str(), + channel->get_rtp_peer_port()); } srs_utime_t now = srs_get_system_time(); srs_utime_t duration = now - recv_stream_time; + + //if no RTP data is received within 2 seconds, + //the peer-port and peer-ip will be cleared and + //other port data will be received again + if (duration > (2 * SRS_UTIME_SECONDS) && channel->get_rtp_peer_port() != 0){ + srs_warn("gb28181: client id=%s ssrc=%#x, peer(%s, %d), no rtp data %d in seconds, clean it, wait other port!", + channel_id.c_str(), channel->get_ssrc(), channel->get_rtp_peer_ip().c_str(), + channel->get_rtp_peer_port(), duration/SRS_UTIME_SECONDS); + channel->set_rtp_peer_port(0); + channel->set_rtp_peer_ip(""); + } SrsGb28181Config config = gb28181_manger->get_gb28181_config(); if (duration > config.rtp_idle_timeout){ @@ -757,8 +839,11 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() break; } - srs_cond_timedwait(wait_ps_queue, 5 * SRS_UTIME_MILLISECONDS); - //srs_usleep(1000 * 5); + if (ps_queue.empty()){ + srs_cond_timedwait(wait_ps_queue, 200 * SRS_UTIME_MILLISECONDS); + }else { + srs_cond_timedwait(wait_ps_queue, 10 * SRS_UTIME_MILLISECONDS); + } } return err; @@ -777,6 +862,8 @@ void SrsGb28181RtmpMuxer::stop() void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt) { srs_assert(pkt); + + recv_stream_time = srs_get_system_time(); //prevent consumers from being unable to process data //and accumulating in the queue @@ -870,7 +957,6 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_video(SrsSimpleStream *stream, int64_t f if (h264_sps == sps) { continue; } - h264_sps_changed = true; h264_sps = sps; if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { @@ -889,7 +975,6 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_video(SrsSimpleStream *stream, int64_t f if (h264_pps == pps) { continue; } - h264_pps_changed = true; h264_pps = pps; if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { @@ -1003,10 +1088,6 @@ srs_error_t SrsGb28181RtmpMuxer::write_h264_sps_pps(uint32_t dts, uint32_t pts) { srs_error_t err = srs_success; - if (!h264_sps_changed || !h264_pps_changed) { - return err; - } - // h264 raw to h264 packet. std::string sh; if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { @@ -1028,10 +1109,6 @@ srs_error_t SrsGb28181RtmpMuxer::write_h264_sps_pps(uint32_t dts, uint32_t pts) return srs_error_wrap(err, "write packet"); } - // reset sps and pps. - h264_sps_changed = false; - h264_pps_changed = false; - h264_sps_pps_sent = true; return err; } @@ -1140,6 +1217,12 @@ srs_error_t SrsGb28181RtmpMuxer::connect() void SrsGb28181RtmpMuxer::close() { srs_freep(sdk); + + // cleared and sequence header will be sent again next time. + // RTMP close may stop through API(rtmp_close) + h264_sps = ""; + h264_pps = ""; + aac_specific_config = ""; } void SrsGb28181RtmpMuxer::rtmp_close(){ @@ -1157,6 +1240,9 @@ SrsGb28181StreamChannel::SrsGb28181StreamChannel(){ ssrc = 0; rtp_peer_port = 0; rtp_peer_ip = ""; + rtmp_url = ""; + recv_time = 0; + recv_time_str = ""; } SrsGb28181StreamChannel::~SrsGb28181StreamChannel() @@ -1177,6 +1263,12 @@ void SrsGb28181StreamChannel::copy(const SrsGb28181StreamChannel *s){ rtp_peer_ip = s->get_rtp_peer_ip(); rtp_peer_port = s->get_rtp_peer_port(); + + rtmp_url = s->get_rtmp_url(); + + recv_time_str = s->get_recv_time_str(); + recv_time = s->get_recv_time(); + } void SrsGb28181StreamChannel::dumps(SrsJsonObject* obj) @@ -1186,12 +1278,16 @@ void SrsGb28181StreamChannel::dumps(SrsJsonObject* obj) obj->set("rtmp_port", SrsJsonAny::integer(rtmp_port)); obj->set("app", SrsJsonAny::str(app.c_str())); obj->set("stream", SrsJsonAny::str(stream.c_str())); + obj->set("rtmp_url", SrsJsonAny::str(rtmp_url.c_str())); obj->set("ssrc", SrsJsonAny::integer(ssrc)); obj->set("rtp_port", SrsJsonAny::integer(rtp_port)); obj->set("port_mode", SrsJsonAny::str(port_mode.c_str())); obj->set("rtp_peer_port", SrsJsonAny::integer(rtp_peer_port)); obj->set("rtp_peer_ip", SrsJsonAny::str(rtp_peer_ip.c_str())); + obj->set("recv_time", SrsJsonAny::integer(recv_time/SRS_UTIME_SECONDS)); + obj->set("recv_time_str", SrsJsonAny::str(recv_time_str.c_str())); + } @@ -1272,10 +1368,12 @@ uint32_t SrsGb28181Manger::hash_code(std::string str) uint32_t SrsGb28181Manger::generate_ssrc(std::string id) { srand(uint(time(0))); - // TODO: SSRC rules can be customized, - //uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01); - //uint32_t ssrc = 0x00FFFFF0 & (hash_code(id) << 4) | index; - uint32_t ssrc = 0x00FFFFFF & (hash_code(id)); + // TODO: SSRC rules can be customized, + //gb28281 live ssrc max value 0999999999(3B9AC9FF) + //gb28281 vod ssrc max value 1999999999(773593FF) + uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01); + uint32_t ssrc = 0x2FFFF00 & (hash_code(id) << 8) | index; + //uint32_t ssrc = 0x00FFFFFF & (hash_code(id)); srs_trace("gb28181: generate ssrc id=%s, ssrc=%u", id.c_str(), ssrc); return ssrc; } @@ -1292,7 +1390,7 @@ srs_error_t SrsGb28181Manger::fetch_or_create_rtmpmuxer(std::string id, SrsGb28 muxer = new SrsGb28181RtmpMuxer(this, id, config->audio_enable, config->wait_keyframe); if ((err = muxer->serve()) != srs_success) { - return srs_error_wrap(err, "gb28281: rtmp muxer serve %s", id.c_str()); + return srs_error_wrap(err, "gb28181: rtmp muxer serve %s", id.c_str()); } rtmpmuxers[id] = muxer; *gb28181 = muxer; @@ -1373,6 +1471,10 @@ void SrsGb28181Manger::remove(SrsGb28181RtmpMuxer* muxer) manager->remove(muxer); } +void SrsGb28181Manger::remove_sip_session(SrsGb28181SipSession* sess) +{ + manager->remove(sess); +} srs_error_t SrsGb28181Manger::start_ps_rtp_listen(std::string id, int port) { @@ -1441,24 +1543,12 @@ uint32_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *channe return ERROR_SUCCESS; } - if (channel->get_stream().empty()){ - channel->set_stream("[stream]"); - } - - if (channel->get_app().empty()){ - channel->set_stream("[app]"); - } - - if (channel->get_port_mode().empty()){ - channel->set_port_mode(RTP_PORT_MODE_FIXED); - } - - //create on rtmp muxer, gb28281 stream to rtmp + //create on rtmp muxer, gb28181 stream to rtmp srs_error_t err = srs_success; if ((err = fetch_or_create_rtmpmuxer(id, &muxer)) != srs_success){ srs_warn("gb28181: create rtmp muxer error, %s", srs_error_desc(err).c_str()); srs_freep(err); - return ERROR_GB28281_CREATER_RTMPMUXER_FAILED; + return ERROR_GB28181_CREATER_RTMPMUXER_FAILED; } //Start RTP listening port, receive gb28181 stream, @@ -1466,6 +1556,11 @@ uint32_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *channe //random is random allocation port int rtp_port = 0; std::string port_mode = channel->get_port_mode(); + + if (port_mode.empty()){ + port_mode = RTP_PORT_MODE_FIXED; + channel->set_port_mode(port_mode); + } if (port_mode == RTP_PORT_MODE_RANDOM){ alloc_port(&rtp_port); @@ -1477,7 +1572,7 @@ uint32_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *channe srs_warn("gb28181: start ps rtp listen error, %s", srs_error_desc(err).c_str()); srs_freep(err); free_port(rtp_port, rtp_port + 1); - return ERROR_GB28281_CREATER_RTMPMUXER_FAILED; + return ERROR_GB28181_CREATER_RTMPMUXER_FAILED; } } else if(port_mode == RTP_PORT_MODE_FIXED) { @@ -1487,38 +1582,75 @@ uint32_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *channe return ERROR_GB28181_PORT_MODE_INVALID; } - //Generate SSRC according to the hash code, - //of the string value of the id - uint32_t ssrc = generate_ssrc(id); + uint32_t ssrc = channel->get_ssrc(); + if (ssrc == 0){ + //auto generate SSRC according to the hash code, + //of the string value of the id + ssrc = generate_ssrc(id); + } rtmpmuxer_map_by_ssrc(muxer, ssrc); - //Generate RTMP push stream address, - std::string app = channel->get_app(); - std::string stream = channel->get_stream(); - app = srs_string_replace(app, "[app]", "live"); - stream = srs_string_replace(stream, "[stream]", id); - - std::string url = "rtmp://" + config->output + "/" + app + "/" + stream; + //generate RTMP push stream address, + //if the app and stream in the API are empty, + //RTMP URL is generated using the output template parameter + std::string url = ""; int rtmp_port; + string app = channel->get_app(); + string stream = channel->get_stream(); + if (true) { - std::string schema, host, vhost, param, _app, _stream; - srs_discovery_tc_url(url, schema, host, vhost, _app, _stream, rtmp_port, param); - url = srs_generate_rtmp_url(host, rtmp_port, "", "", app, stream, ""); + string tcUrl, stream_name; + + //get template rtmp url configuration + std::string output = config->output; + srs_parse_rtmp_url(output, tcUrl, stream_name); + + string _schema, _host, _vhost, _param, _app, _stream; + srs_discovery_tc_url(tcUrl, _schema, _host, _vhost, _app, _stream, rtmp_port, _param); + + //if the stream name is not parameterized, + //it needs to be parameterized to ensure that the stream name is different + if (!srs_string_contains(stream_name, "[stream]") && + !srs_string_contains(stream_name, "[timestamp]") && + !srs_string_contains(stream_name, "[ssrc]")){ + stream_name = stream_name + "_[stream]"; + } + + if (app.empty()){ + app = _app; + } + + if (stream.empty()) + { + stream = stream_name; + } + + url = srs_generate_rtmp_url(_host, rtmp_port, "", "", app, stream, ""); + url = srs_string_replace(url, "[app]", "live"); + url = srs_string_replace(url, "[stream]", id); std::stringstream ss; ss << ssrc; url = srs_string_replace(url, "[ssrc]", ss.str()); url = srs_path_build_timestamp(url); - } - muxer->set_rtmp_url(url); - srs_trace("gb28181: create new stream channel id:%s rtmp url=%s", id.c_str(), muxer->rtmp_url().c_str()); + + //update channel app stream value + srs_parse_rtmp_url(url, tcUrl, stream_name); + srs_discovery_tc_url(tcUrl, _schema, _host, _vhost, _app, _stream, rtmp_port, _param); + + //generate the value returned to the api response + channel->set_rtp_port(rtp_port); + channel->set_ssrc(ssrc); - //generate the value returned to the api response - channel->set_app(app); - channel->set_stream(stream); - channel->set_rtp_port(rtp_port); - channel->set_rtmp_port(rtmp_port); - channel->set_ip(config->host); - channel->set_ssrc(ssrc); + channel->set_app(_app); + channel->set_stream(stream_name); + channel->set_rtmp_port(rtmp_port); + channel->set_ip(config->host); + std::string play_url = srs_generate_rtmp_url(config->host, rtmp_port, "", "", app, stream_name, ""); + channel->set_rtmp_url(play_url); + } + + muxer->set_rtmp_url(url); + srs_trace("gb28181: create new stream channel id:%s rtmp url=%s", id.c_str(), url.c_str()); muxer->copy_channel(channel); @@ -1580,8 +1712,6 @@ uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int //channel not exist SrsGb28181StreamChannel channel; channel.set_channel_id(id); - channel.set_app("live"); - channel.set_stream(id); int code = create_stream_channel(&channel); if (code != ERROR_SUCCESS){ return code; diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index b206a1ff2..4600ba010 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -65,6 +65,7 @@ class SrsGb28181Config; class SrsGb28181PsRtpProcessor; class SrsGb28181SipService; class SrsGb28181StreamChannel; +class SrsGb28181SipSession; //ps rtp header packet parse class SrsPsRtpPacket: public SrsRtpPacket @@ -76,7 +77,7 @@ public: virtual srs_error_t decode(SrsBuffer* stream); }; -//randomly assigned ports receive gb28281 device streams +//randomly assigned ports receive gb28181 device streams class SrsPsRtpListener: public ISrsUdpHandler { private: @@ -94,7 +95,7 @@ public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); }; -//multiplexing service, single port receiving all gb28281 device streams +//multiplexing service, single port receiving all gb28181 device streams class SrsGb28181RtpMuxService : public ISrsUdpHandler { private: @@ -110,7 +111,7 @@ public: }; -//process gb28281 RTP package, generate a completed PS stream data, +//process gb28181 RTP package, generate a completed PS stream data, //call the PS stream parser, parse the original video and audio class SrsGb28181PsRtpProcessor: public ISrsUdpHandler { @@ -127,6 +128,7 @@ public: private: bool can_send_ps_av_packet(); void dispose(); + void clear_pre_packet(); // Interface ISrsUdpHandler public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); @@ -188,6 +190,7 @@ private: SrsFileWriter ps_fw; SrsFileWriter video_fw; SrsFileWriter audio_fw; + SrsFileWriter unknow_fw; bool first_keyframe_flag; bool wait_first_keyframe; @@ -234,9 +237,6 @@ private: SrsRawH264Stream* avc; std::string h264_sps; std::string h264_pps; - bool h264_sps_changed; - bool h264_pps_changed; - bool h264_sps_pps_sent; SrsRawAacStream* aac; std::string aac_specific_config; @@ -262,6 +262,7 @@ public: virtual void set_rtmp_url(std::string url); virtual std::string rtmp_url(); virtual SrsGb28181StreamChannel get_channel(); + srs_utime_t get_recv_stream_time(); private: virtual srs_error_t do_cycle(); @@ -288,7 +289,7 @@ public: virtual void rtmp_close(); }; -//system parameter configuration of gb28281 module, +//system parameter configuration of gb28181 module, //read file from configuration file to generate class SrsGb28181Config { @@ -301,14 +302,15 @@ public: int rtp_port_min; int rtp_port_max; int rtp_mux_port; + bool auto_create_channel; //sip config int sip_port; std::string sip_serial; std::string sip_realm; bool sip_enable; - int sip_ack_timeout; - int sip_keepalive_timeout; + srs_utime_t sip_ack_timeout; + srs_utime_t sip_keepalive_timeout; bool print_sip_message; bool sip_auto_play; bool sip_invite_port_fixed; @@ -325,11 +327,14 @@ private: std::string port_mode; std::string app; std::string stream; + std::string rtmp_url; std::string ip; int rtp_port; int rtmp_port; uint32_t ssrc; + srs_utime_t recv_time; + std::string recv_time_str; //send rtp stream client local port int rtp_peer_port; @@ -350,6 +355,9 @@ public: uint32_t get_ssrc() const { return ssrc; } uint32_t get_rtp_peer_port() const { return rtp_peer_port; } std::string get_rtp_peer_ip() const { return rtp_peer_ip; } + std::string get_rtmp_url() const { return rtmp_url; } + srs_utime_t get_recv_time() const { return recv_time; } + std::string get_recv_time_str() const { return recv_time_str; } void set_channel_id(const std::string &i) { channel_id = i; } void set_port_mode(const std::string &p) { port_mode = p; } @@ -361,6 +369,9 @@ public: void set_ssrc( const int &s) { ssrc = s;} void set_rtp_peer_ip( const std::string &p) { rtp_peer_ip = p; } void set_rtp_peer_port( const int &s) { rtp_peer_port = s;} + void set_rtmp_url( const std::string &u) { rtmp_url = u; } + void set_recv_time( const srs_utime_t &u) { recv_time = u; } + void set_recv_time_str( const std::string &u) { recv_time_str = u; } void copy(const SrsGb28181StreamChannel *s); void dumps(SrsJsonObject* obj); @@ -429,7 +440,7 @@ public: public: void remove(SrsGb28181RtmpMuxer* conn); - + void remove_sip_session(SrsGb28181SipSession* sess); }; #endif diff --git a/trunk/src/app/srs_app_gb28181_sip.cpp b/trunk/src/app/srs_app_gb28181_sip.cpp index 72a55685c..2c819ad17 100644 --- a/trunk/src/app/srs_app_gb28181_sip.cpp +++ b/trunk/src/app/srs_app_gb28181_sip.cpp @@ -47,11 +47,34 @@ using namespace std; #include +std::string srs_get_sip_session_status_str(SrsGb28181SipSessionStatusType status) +{ + switch(status){ + case SrsGb28181SipSessionRegisterOk: + return "RegisterOk"; + case SrsGb28181SipSessionAliveOk: + return "AliveOk"; + case SrsGb28181SipSessionInviteOk: + return "InviteOk"; + case SrsGb28181SipSessionTrying: + return "InviteTrying"; + case SrsGb28181SipSessionBye: + return "InviteBye"; + default: + return "Unknow"; + } +} + SrsGb28181SipSession::SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipRequest* r) { - caster = c; + servcie = c; req = new SrsSipRequest(); req->copy(r); + _session_id = req->sip_auth_id; + _reg_expires = 3600 * SRS_UTIME_SECONDS; + + trd = new SrsSTCoroutine("gb28181sip", this); + pprint = SrsPithyPrint::create_caster(); _register_status = SrsGb28181SipSessionUnkonw; _alive_status = SrsGb28181SipSessionUnkonw; @@ -59,19 +82,113 @@ SrsGb28181SipSession::SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipReques _register_time = 0; _alive_time = 0; _invite_time = 0; - _recv_rtp_time = 0; - _reg_expires = 0; - + _peer_ip = ""; _peer_port = 0; - _from = NULL; _fromlen = 0; } SrsGb28181SipSession::~SrsGb28181SipSession() { srs_freep(req); + srs_freep(trd); + srs_freep(pprint); +} + +srs_error_t SrsGb28181SipSession::serve() +{ + srs_error_t err = srs_success; + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "gb28181sip"); + } + + return err; +} + +srs_error_t SrsGb28181SipSession::do_cycle() +{ + srs_error_t err = srs_success; + _register_time = srs_get_system_time(); + _alive_time = srs_get_system_time(); + _invite_time = srs_get_system_time(); + + while (true) { + + pprint->elapse(); + + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "gb28181 sip session cycle"); + } + + srs_utime_t now = srs_get_system_time(); + srs_utime_t reg_duration = now - _register_time; + srs_utime_t alive_duration = now - _alive_time; + srs_utime_t invite_duration = now - _invite_time; + SrsGb28181Config *config = servcie->get_config(); + + + if (_register_status == SrsGb28181SipSessionRegisterOk && + reg_duration > _reg_expires){ + srs_trace("gb28181: sip session=%s register expire", _session_id.c_str()); + break; + } + + if (_register_status == SrsGb28181SipSessionRegisterOk && + _alive_status == SrsGb28181SipSessionAliveOk && + alive_duration > config->sip_keepalive_timeout){ + srs_trace("gb28181: sip session=%s keepalive timeout", _session_id.c_str()); + break; + } + + if (_invite_status == SrsGb28181SipSessionTrying && + invite_duration > config->sip_ack_timeout){ + _invite_status == SrsGb28181SipSessionUnkonw; + } + + if (pprint->can_print()){ + srs_trace("gb28181: sip session=%s peer(%s, %d) status(%s,%s,%s) duration(%u,%u,%u)", + _session_id.c_str(), _peer_ip.c_str(), _peer_port, + srs_get_sip_session_status_str(_register_status).c_str(), + srs_get_sip_session_status_str(_alive_status).c_str(), + srs_get_sip_session_status_str(_invite_status).c_str(), + (reg_duration / SRS_UTIME_SECONDS), + (alive_duration / SRS_UTIME_SECONDS), + (invite_duration / SRS_UTIME_SECONDS)); + + //It is possible that the camera head keeps pushing and opening, + //and the duration will be very large. It will take 1 day to update + if (invite_duration > 24 * SRS_UTIME_HOURS){ + _invite_time = srs_get_system_time(); + } + } + srs_usleep(5* SRS_UTIME_SECONDS); + } + + return err; +} + +std::string SrsGb28181SipSession::remote_ip() +{ + return _peer_ip; +} + +srs_error_t SrsGb28181SipSession::cycle() +{ + srs_error_t err = do_cycle(); + + servcie->remove_session(_session_id); + srs_trace("gb28181: client id=%s sip session is remove", _session_id.c_str()); + + if (err == srs_success) { + srs_trace("gb28181: sip client finished."); + } else if (srs_is_client_gracefully_close(err)) { + srs_warn("gb28181: sip client disconnect code=%d", srs_error_code(err)); + srs_freep(err); + } + + return err; } //gb28181 sip Service @@ -93,6 +210,11 @@ SrsGb28181SipService::~SrsGb28181SipService() srs_freep(config); } +SrsGb28181Config* SrsGb28181SipService::get_config() +{ + return config; +} + void SrsGb28181SipService::set_stfd(srs_netfd_t fd) { lfd = fd; @@ -123,11 +245,8 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, { srs_error_t err = srs_success; - if (config->print_sip_message) - { - srs_trace("gb28181: request peer_ip=%s, peer_port=%d nbbuf=%d", peer_ip.c_str(), peer_port, nb_buf); - srs_trace("gb28181: request recv message=%s", buf); - } + srs_info("gb28181: request peer(%s, %d) nbbuf=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_info("gb28181: request recv message=%s", buf); if (nb_buf < 10) { return err; @@ -138,14 +257,7 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, if ((err = sip->parse_request(&req, buf, nb_buf)) != srs_success) { return srs_error_wrap(err, "parse sip request"); } - - if (config->print_sip_message) - { - srs_trace("gb28181: %s method=%s, uri=%s, version=%s ", - req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); - srs_trace("gb28281: request client id=%s", req->sip_auth_id.c_str()); - } - + req->peer_ip = peer_ip; req->peer_port = peer_port; SrsAutoFree(SrsSipRequest, req); @@ -155,27 +267,28 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, if (req->is_register()) { std::vector serial = srs_string_split(srs_string_replace(req->uri,"sip:", ""), "@"); if (serial.at(0) != config->sip_serial){ - srs_trace("gb28181: client:%s request serial and server serial inconformity(%s:%s)", + srs_warn("gb28181: client:%s request serial and server serial inconformity(%s:%s)", req->sip_auth_id.c_str(), serial.at(0).c_str(), config->sip_serial.c_str()); return err; } - srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); - srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", - req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); - srs_trace("gb28281: request client id=%s", req->sip_auth_id.c_str()); + srs_trace("gb28181: request client id=%s peer(%s, %d)", req->sip_auth_id.c_str(), peer_ip.c_str(), peer_port); + srs_trace("gb28181: request %s method=%s, uri=%s, version=%s expires=%d", + req->get_cmdtype_str().c_str(), req->method.c_str(), + req->uri.c_str(), req->version.c_str(), req->expires); - SrsGb28181SipSession* sip_session = create_sip_session(req); - if (!sip_session) { - srs_trace("gb28181: create sip session faild:%s", req->uri.c_str()); + SrsGb28181SipSession* sip_session = NULL; + if ((err = fetch_or_create_sip_session(req, &sip_session)) != srs_success) { + srs_error_wrap(err, "create sip session error!"); return err; } + srs_assert(sip_session); send_status(req, from, fromlen); sip_session->set_register_status(SrsGb28181SipSessionRegisterOk); sip_session->set_register_time(srs_get_system_time()); sip_session->set_reg_expires(req->expires); - sip_session->set_sockaddr(from); + sip_session->set_sockaddr((sockaddr)*from); sip_session->set_sockaddr_len(fromlen); sip_session->set_peer_ip(peer_ip); sip_session->set_peer_port(peer_port); @@ -188,11 +301,11 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, //reponse status send_status(req, from, fromlen); - sip_session->set_register_status(SrsGb28181SipSessionRegisterOk); - sip_session->set_register_time(srs_get_system_time()); + //sip_session->set_register_status(SrsGb28181SipSessionRegisterOk); + //sip_session->set_register_time(srs_get_system_time()); sip_session->set_alive_status(SrsGb28181SipSessionAliveOk); sip_session->set_alive_time(srs_get_system_time()); - sip_session->set_sockaddr(from); + sip_session->set_sockaddr((sockaddr)*from); sip_session->set_sockaddr_len(fromlen); sip_session->set_peer_port(peer_port); sip_session->set_peer_ip(peer_ip); @@ -203,14 +316,14 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, sip_session->alive_status() == SrsGb28181SipSessionAliveOk && sip_session->invite_status() == SrsGb28181SipSessionUnkonw) { - //stop the possible stream and push a new stream - //send_bye(req, from, fromlen); + srs_trace("gb28181: request client id=%s, peer(%s, %d)", req->sip_auth_id.c_str(), + peer_ip.c_str(), peer_port); + srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", req->get_cmdtype_str().c_str(), + req->method.c_str(), req->uri.c_str(), req->version.c_str()); SrsGb28181StreamChannel ch; ch.set_channel_id(session_id); ch.set_ip(config->host); - ch.set_stream(session_id); - ch.set_app("live"); if (config->sip_invite_port_fixed){ ch.set_port_mode(RTP_PORT_MODE_FIXED); }else { @@ -232,16 +345,18 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, }else if (req->is_invite()) { SrsGb28181SipSession* sip_session = fetch(session_id); - srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_trace("gb28181: request client id=%s, peer(%s, %d)", req->sip_auth_id.c_str(), + peer_ip.c_str(), peer_port); srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); - srs_trace("gb28281: request client id=%s", req->sip_auth_id.c_str()); if (!sip_session){ - send_bye(req); srs_trace("gb28181: %s client not registered", req->sip_auth_id.c_str()); return err; } + + sip_session->set_sockaddr((sockaddr)*from); + sip_session->set_sockaddr_len(fromlen); if (sip_session->register_status() == SrsGb28181SipSessionUnkonw || sip_session->alive_status() == SrsGb28181SipSessionUnkonw) { @@ -258,14 +373,14 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, sip_session->set_request(req); }else{ sip_session->set_invite_status(SrsGb28181SipSessionUnkonw); - sip_session->set_invite_time(0); + sip_session->set_invite_time(srs_get_system_time()); } }else if (req->is_bye()) { - srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_trace("gb28181: request client id=%s, peer(%s, %d)", req->sip_auth_id.c_str(), + peer_ip.c_str(), peer_port); srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); - srs_trace("gb28281: request client id=%s", req->sip_auth_id.c_str()); - + SrsGb28181SipSession* sip_session = fetch(session_id); send_status(req, from, fromlen); @@ -273,9 +388,12 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, srs_trace("gb28181: %s client not registered", req->sip_auth_id.c_str()); return err; } + + sip_session->set_sockaddr((sockaddr)*from); + sip_session->set_sockaddr_len(fromlen); sip_session->set_invite_status(SrsGb28181SipSessionBye); - sip_session->set_invite_time(0); + sip_session->set_invite_time(srs_get_system_time()); }else{ srs_trace("gb28181: ingor request method=%s", req->method.c_str()); @@ -287,8 +405,7 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, int SrsGb28181SipService::send_message(sockaddr* from, int fromlen, std::stringstream& ss) { std::string str = ss.str(); - if (config->print_sip_message) - srs_trace("gb28181: send_message:%s", str.c_str()); + srs_info("gb28181: send_message:%s", str.c_str()); srs_assert(!str.empty()); int ret = srs_sendto(lfd, (char*)str.c_str(), (int)str.length(), from, fromlen, SRS_UTIME_NO_TIMEOUT); @@ -311,7 +428,7 @@ int SrsGb28181SipService::send_ack(SrsSipRequest *req, sockaddr *f, int l) req->realm = config->sip_realm; req->serial = config->sip_serial; - sip->resp_ack(ss, req); + sip->req_ack(ss, req); return send_message(f, l, ss); } @@ -345,7 +462,7 @@ int SrsGb28181SipService::send_invite(SrsSipRequest *req, string ip, int port, //you cannot invite again. you need to 'bye' and try again if (sip_session->invite_status() == SrsGb28181SipSessionTrying || sip_session->invite_status() == SrsGb28181SipSessionInviteOk){ - return ERROR_GB28281_SIP_IS_INVITING; + return ERROR_GB28181_SIP_IS_INVITING; } req->host = config->host; @@ -356,9 +473,11 @@ int SrsGb28181SipService::send_invite(SrsSipRequest *req, string ip, int port, std::stringstream ss; sip->req_invite(ss, req, ip, port, ssrc); - if (send_message(sip_session->sockaddr_from(), sip_session->sockaddr_fromlen(), ss) <= 0) + sockaddr addr = sip_session->sockaddr_from(); + + if (send_message(&addr, sip_session->sockaddr_fromlen(), ss) <= 0) { - return ERROR_GB28281_SIP_INVITE_FAILED; + return ERROR_GB28181_SIP_INVITE_FAILED; } sip_session->set_invite_status(SrsGb28181SipSessionTrying); @@ -391,17 +510,15 @@ int SrsGb28181SipService::send_bye(SrsSipRequest *req) std::stringstream ss; sip->req_bye(ss, req); - if (send_message(sip_session->sockaddr_from(), sip_session->sockaddr_fromlen(), ss) <= 0) + sockaddr addr = sip_session->sockaddr_from(); + if (send_message(&addr, sip_session->sockaddr_fromlen(), ss) <= 0) { - return ERROR_GB28281_SIP_BYE_FAILED; + return ERROR_GB28181_SIP_BYE_FAILED; } return ERROR_SUCCESS; - - } - int SrsGb28181SipService::send_sip_raw_data(SrsSipRequest *req, std::string data) { srs_assert(req); @@ -415,27 +532,34 @@ int SrsGb28181SipService::send_sip_raw_data(SrsSipRequest *req, std::string dat std::stringstream ss; ss << data; - if (send_message(sip_session->sockaddr_from(), sip_session->sockaddr_fromlen(), ss) <= 0) + sockaddr addr = sip_session->sockaddr_from(); + if (send_message(&addr, sip_session->sockaddr_fromlen(), ss) <= 0) { - return ERROR_GB28281_SIP_BYE_FAILED; + return ERROR_GB28181_SIP_BYE_FAILED; } return ERROR_SUCCESS; } -SrsGb28181SipSession* SrsGb28181SipService::create_sip_session(SrsSipRequest *req) +srs_error_t SrsGb28181SipService::fetch_or_create_sip_session(SrsSipRequest *req, SrsGb28181SipSession** sip_session) { - SrsGb28181SipSession *sess = NULL; - - std::map::iterator it = sessions.find(req->sip_auth_id); - if (it == sessions.end()){ - sess = new SrsGb28181SipSession(this, req); - }else{ - return it->second; - } + srs_error_t err = srs_success; + SrsGb28181SipSession* sess = NULL; + if ((sess = fetch(req->sip_auth_id)) != NULL) { + *sip_session = sess; + return err; + } + + sess = new SrsGb28181SipSession(this, req);; + if ((err = sess->serve()) != srs_success) { + return srs_error_wrap(err, "gb28181: sip serssion serve %s", req->sip_auth_id.c_str()); + } + sessions[req->sip_auth_id] = sess; - return sess; + *sip_session = sess; + + return err; } SrsGb28181SipSession* SrsGb28181SipService::fetch(std::string sid) @@ -452,7 +576,9 @@ void SrsGb28181SipService::remove_session(std::string sid) { std::map::iterator it = sessions.find(sid); if (it != sessions.end()){ - srs_freep(it->second); + //srs_freep(it->second); + //thread exit management by gb28181 manger + _srs_gb28181->remove_sip_session(it->second); sessions.erase(it); } } @@ -463,7 +589,9 @@ void SrsGb28181SipService::destroy() //destory all sip session std::map::iterator it; for (it = sessions.begin(); it != sessions.end(); ++it) { - srs_freep(it->second); + //srs_freep(it->second); + //thread exit management by gb28181 manger + _srs_gb28181->remove_sip_session(it->second); } sessions.clear(); } diff --git a/trunk/src/app/srs_app_gb28181_sip.hpp b/trunk/src/app/srs_app_gb28181_sip.hpp index 9206e66c9..d42d59e41 100644 --- a/trunk/src/app/srs_app_gb28181_sip.hpp +++ b/trunk/src/app/srs_app_gb28181_sip.hpp @@ -33,6 +33,7 @@ #include #include #include +#include class SrsConfDirective; @@ -41,7 +42,7 @@ class SrsGb28181Config; class SrsSipStack; class SrsGb28181SipService; -enum SrsGb28281SipSessionStatusType{ +enum SrsGb28181SipSessionStatusType{ SrsGb28181SipSessionUnkonw = 0, SrsGb28181SipSessionRegisterOk = 1, SrsGb28181SipSessionAliveOk = 2, @@ -50,62 +51,75 @@ enum SrsGb28281SipSessionStatusType{ SrsGb28181SipSessionBye = 5, }; -class SrsGb28181SipSession +class SrsGb28181SipSession: public ISrsCoroutineHandler, public ISrsConnection { private: //SrsSipRequest *req; - SrsGb28181SipService *caster; - std::string session_id; + SrsGb28181SipService *servcie; + std::string _session_id; + SrsCoroutine* trd; + SrsPithyPrint* pprint; private: - SrsGb28281SipSessionStatusType _register_status; - SrsGb28281SipSessionStatusType _alive_status; - SrsGb28281SipSessionStatusType _invite_status; + SrsGb28181SipSessionStatusType _register_status; + SrsGb28181SipSessionStatusType _alive_status; + SrsGb28181SipSessionStatusType _invite_status; srs_utime_t _register_time; srs_utime_t _alive_time; srs_utime_t _invite_time; - srs_utime_t _recv_rtp_time; - int _reg_expires; + srs_utime_t _reg_expires; std::string _peer_ip; int _peer_port; - sockaddr *_from; + sockaddr _from; int _fromlen; SrsSipRequest *req; -public: - void set_register_status(SrsGb28281SipSessionStatusType s) { _register_status = s;} - void set_alive_status(SrsGb28281SipSessionStatusType s) { _alive_status = s;} - void set_invite_status(SrsGb28281SipSessionStatusType s) { _invite_status = s;} - void set_register_time(srs_utime_t t) { _register_time = t;} - void set_alive_time(srs_utime_t t) { _alive_time = t;} - void set_invite_time(srs_utime_t t) { _invite_time = t;} - void set_recv_rtp_time(srs_utime_t t) { _recv_rtp_time = t;} - void set_reg_expires(int e) { _reg_expires = e;} - void set_peer_ip(std::string i) { _peer_ip = i;} - void set_peer_port(int o) { _peer_port = o;} - void set_sockaddr(sockaddr *f) { _from = f;} - void set_sockaddr_len(int l) { _fromlen = l;} - void set_request(SrsSipRequest *r) { req->copy(r);} - - SrsGb28281SipSessionStatusType register_status() { return _register_status;} - SrsGb28281SipSessionStatusType alive_status() { return _alive_status;} - SrsGb28281SipSessionStatusType invite_status() { return _invite_status;} - srs_utime_t register_time() { return _register_time;} - srs_utime_t alive_time() { return _alive_time;} - srs_utime_t invite_time() { return _invite_time;} - srs_utime_t recv_rtp_time() { return _recv_rtp_time;} - int reg_expires() { return _reg_expires;} - std::string peer_ip() { return _peer_ip;} - int peer_port() { return _peer_port;} - sockaddr* sockaddr_from() { return _from;} - int sockaddr_fromlen() { return _fromlen;} - SrsSipRequest request() { return *req;} - public: SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipRequest* r); virtual ~SrsGb28181SipSession(); +public: + void set_register_status(SrsGb28181SipSessionStatusType s) { _register_status = s;} + void set_alive_status(SrsGb28181SipSessionStatusType s) { _alive_status = s;} + void set_invite_status(SrsGb28181SipSessionStatusType s) { _invite_status = s;} + void set_register_time(srs_utime_t t) { _register_time = t;} + void set_alive_time(srs_utime_t t) { _alive_time = t;} + void set_invite_time(srs_utime_t t) { _invite_time = t;} + //void set_recv_rtp_time(srs_utime_t t) { _recv_rtp_time = t;} + void set_reg_expires(int e) { _reg_expires = e*SRS_UTIME_SECONDS;} + void set_peer_ip(std::string i) { _peer_ip = i;} + void set_peer_port(int o) { _peer_port = o;} + void set_sockaddr(sockaddr f) { _from = f;} + void set_sockaddr_len(int l) { _fromlen = l;} + void set_request(SrsSipRequest *r) { req->copy(r);} + + + SrsGb28181SipSessionStatusType register_status() { return _register_status;} + SrsGb28181SipSessionStatusType alive_status() { return _alive_status;} + SrsGb28181SipSessionStatusType invite_status() { return _invite_status;} + srs_utime_t register_time() { return _register_time;} + srs_utime_t alive_time() { return _alive_time;} + srs_utime_t invite_time() { return _invite_time;} + //srs_utime_t recv_rtp_time() { return _recv_rtp_time;} + int reg_expires() { return _reg_expires;} + std::string peer_ip() { return _peer_ip;} + int peer_port() { return _peer_port;} + sockaddr sockaddr_from() { return _from;} + int sockaddr_fromlen() { return _fromlen;} + SrsSipRequest request() { return *req;} + + std::string session_id() { return _session_id;} + +public: + virtual srs_error_t serve(); + +// Interface ISrsOneCycleThreadHandler +public: + virtual srs_error_t cycle(); + virtual std::string remote_ip(); +private: + virtual srs_error_t do_cycle(); }; class SrsGb28181SipService : public ISrsUdpHandler @@ -152,9 +166,12 @@ public: // int send_sip_raw_data(SrsSipRequest *req, std::string data); - SrsGb28181SipSession* create_sip_session(SrsSipRequest *req); +public: + srs_error_t fetch_or_create_sip_session(SrsSipRequest *req, SrsGb28181SipSession** sess); SrsGb28181SipSession* fetch(std::string id); void remove_session(std::string id); + SrsGb28181Config* get_config(); + }; #endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index f4045ce40..224f4790e 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -696,7 +696,7 @@ void SrsServer::destroy() srs_freep(conn_manager); #ifdef SRS_AUTO_GB28181 - //free global gb28281 manager + //free global gb28181 manager srs_freep(_srs_gb28181); #endif } @@ -1360,7 +1360,7 @@ srs_error_t SrsServer::listen_http_stream() } #ifdef SRS_AUTO_GB28181 -srs_error_t SrsServer::listen_gb28281_sip(SrsConfDirective* stream_caster) +srs_error_t SrsServer::listen_gb28181_sip(SrsConfDirective* stream_caster) { srs_error_t err = srs_success; @@ -1411,7 +1411,7 @@ srs_error_t SrsServer::listen_stream_caster() listener = new SrsHttpFlvListener(this, SrsListenerFlv, stream_caster); } else if (srs_stream_caster_is_gb28181(caster)) { #ifdef SRS_AUTO_GB28181 - //init global gb28281 manger + //init global gb28181 manger if (_srs_gb28181 == NULL){ _srs_gb28181 = new SrsGb28181Manger(stream_caster); if ((err = _srs_gb28181->initialize()) != srs_success){ @@ -1421,12 +1421,12 @@ srs_error_t SrsServer::listen_stream_caster() //sip listener if (_srs_config->get_stream_caster_gb28181_sip_enable(stream_caster)){ - if ((err = listen_gb28281_sip(stream_caster)) != srs_success){ + if ((err = listen_gb28181_sip(stream_caster)) != srs_success){ return err; } } - //gb28281 stream listener + //gb28181 stream listener listener = new SrsGb28181Listener(this, SrsListenerGb28181RtpMux, stream_caster); #else srs_warn("gb28181 is disabled, please enable it by: ./configure --with-gb28181"); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 0010929f5..1147a5a35 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -324,7 +324,7 @@ private: virtual srs_error_t listen_http_stream(); virtual srs_error_t listen_stream_caster(); #ifdef SRS_AUTO_GB28181 - virtual srs_error_t listen_gb28281_sip(SrsConfDirective* c); + virtual srs_error_t listen_gb28181_sip(SrsConfDirective* c); #endif // Close the listeners for specified type, // Remove the listen object from manager. diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 65b857021..5a6b8c6e5 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -359,10 +359,10 @@ #define ERROR_GB28181_VALUE_EMPTY 6005 #define ERROR_GB28181_ACTION_INVALID 6006 #define ERROR_GB28181_SIP_NOT_RUN 6007 -#define ERROR_GB28281_SIP_INVITE_FAILED 6008 -#define ERROR_GB28281_SIP_BYE_FAILED 6009 -#define ERROR_GB28281_SIP_IS_INVITING 6010 -#define ERROR_GB28281_CREATER_RTMPMUXER_FAILED 6011 +#define ERROR_GB28181_SIP_INVITE_FAILED 6008 +#define ERROR_GB28181_SIP_BYE_FAILED 6009 +#define ERROR_GB28181_SIP_IS_INVITING 6010 +#define ERROR_GB28181_CREATER_RTMPMUXER_FAILED 6011 /////////////////////////////////////////////////////// // HTTP API error. diff --git a/trunk/src/protocol/srs_sip_stack.cpp b/trunk/src/protocol/srs_sip_stack.cpp index c0b06e55b..a5108fe7a 100644 --- a/trunk/src/protocol/srs_sip_stack.cpp +++ b/trunk/src/protocol/srs_sip_stack.cpp @@ -345,7 +345,7 @@ srs_error_t SrsSipStack::do_parse_request(SrsSipRequest* req, const char* recv_m } } else if (!strcasecmp(phead, "via:")) { - std::vector vec_seq = srs_string_split(content, ";"); + //std::vector vec_via = srs_string_split(content, ";"); req->via = content; req->branch = srs_sip_get_param(content.c_str(), "branch"); } @@ -417,7 +417,52 @@ srs_error_t SrsSipStack::do_parse_request(SrsSipRequest* req, const char* recv_m return err; } -void SrsSipStack::resp_keepalive(std::stringstream& ss, SrsSipRequest *req){ +std::string SrsSipStack::get_sip_from(SrsSipRequest const *req) +{ + std::string from_tag; + if (req->from_tag.empty()){ + from_tag = ""; + }else { + from_tag = ";tag=" + req->from_tag; + } + + return "from + ">" + from_tag; +} + +std::string SrsSipStack::get_sip_to(SrsSipRequest const *req) +{ + std::string to_tag; + if (req->to_tag.empty()){ + to_tag = ""; + }else { + to_tag = ";tag=" + req->to_tag; + } + + return "to + ">" + to_tag; +} + +std::string SrsSipStack::get_sip_via(SrsSipRequest const *req) +{ + std::string via = srs_string_replace(req->via, SRS_SIP_VERSION"/UDP ", ""); + std::vector vec_via = srs_string_split(via, ";"); + std::string ip_port = vec_via.at(0); + std::vector vec_ip_port = srs_string_split(ip_port, ":"); + + std::string branch, rport, received; + if (req->branch.empty()){ + branch = ""; + }else { + branch = ";branch=" + req->branch; + } + + received = ";received=" + vec_ip_port.at(0); + rport = ";rport=" + vec_ip_port.at(1); + + return SRS_SIP_VERSION"/UDP " + ip_port + rport + received + branch; +} + +void SrsSipStack::resp_keepalive(std::stringstream& ss, SrsSipRequest *req) +{ ss << SRS_SIP_VERSION <<" 200 OK" << SRS_RTSP_CRLF << "Via: " << SRS_SIP_VERSION << "/UDP " << req->host << ":" << req->host_port << ";branch=" << req->branch << SRS_RTSP_CRLF << "From: from.c_str() << ">;tag=" << req->from_tag << SRS_RTSP_CRLF @@ -466,9 +511,9 @@ void SrsSipStack::resp_status(stringstream& ss, SrsSipRequest *req) } ss << SRS_SIP_VERSION <<" 200 OK" << SRS_RTSP_CRLF - << "Via: " << req->via << SRS_RTSP_CRLF - << "From: from << ">" << SRS_RTSP_CRLF - << "To: to << ">" << SRS_RTSP_CRLF + << "Via: " << get_sip_via(req) << SRS_RTSP_CRLF + << "From: "<< get_sip_from(req) << SRS_RTSP_CRLF + << "To: "<< get_sip_to(req) << SRS_RTSP_CRLF << "CSeq: "<< req->seq << " " << req->method << SRS_RTSP_CRLF << "Call-ID: " << req->call_id << SRS_RTSP_CRLF << "Contact: " << req->contact << SRS_RTSP_CRLF @@ -511,9 +556,9 @@ void SrsSipStack::resp_status(stringstream& ss, SrsSipRequest *req) */ ss << SRS_SIP_VERSION <<" 200 OK" << SRS_RTSP_CRLF - << "Via: " << req->via << SRS_RTSP_CRLF - << "From: from << ">" << SRS_RTSP_CRLF - << "To: to << ">" << SRS_RTSP_CRLF + << "Via: " << get_sip_via(req) << SRS_RTSP_CRLF + << "From: " << get_sip_from(req) << SRS_RTSP_CRLF + << "To: "<< get_sip_to(req) << SRS_RTSP_CRLF << "CSeq: "<< req->seq << " " << req->method << SRS_RTSP_CRLF << "Call-ID: " << req->call_id << SRS_RTSP_CRLF << "User-Agent: " << SRS_SIP_USER_AGENT << SRS_RTSP_CRLF @@ -635,8 +680,8 @@ void SrsSipStack::req_invite(stringstream& ss, SrsSipRequest *req, string ip, in ss << "INVITE " << req->uri << " " << SRS_SIP_VERSION << SRS_RTSP_CRLF << "Via: " << SRS_SIP_VERSION << "/UDP "<< req->host << ":" << req->host_port << ";rport;branch=" << req->branch << SRS_RTSP_CRLF - << "From: from << ">;tag=" << req->from_tag << SRS_RTSP_CRLF - << "To: to << ">" << SRS_RTSP_CRLF + << "From: " << get_sip_from(req) << SRS_RTSP_CRLF + << "To: " << get_sip_to(req) << SRS_RTSP_CRLF << "Call-ID: 20000" << rand <via << SRS_RTSP_CRLF - << "Via: " << req->via << ";rport=" << req->peer_port << ";received=" << req->peer_ip << ";branch=" << req->branch << SRS_RTSP_CRLF - << "From: from << ">" << SRS_RTSP_CRLF - << "To: to << ">" << SRS_RTSP_CRLF + << "Via: " << get_sip_via(req) << SRS_RTSP_CRLF + << "From: " << get_sip_from(req) << SRS_RTSP_CRLF + << "To: " << get_sip_to(req) << SRS_RTSP_CRLF << "CSeq: "<< req->seq << " " << req->method << SRS_RTSP_CRLF << "Call-ID: " << req->call_id << SRS_RTSP_CRLF << "Contact: " << req->contact << SRS_RTSP_CRLF @@ -678,7 +723,7 @@ void SrsSipStack::req_401_unauthorized(std::stringstream& ss, SrsSipRequest *req return; } -void SrsSipStack::resp_ack(std::stringstream& ss, SrsSipRequest *req){ +void SrsSipStack::req_ack(std::stringstream& ss, SrsSipRequest *req){ /* //request: sip-agent <------ ACK ------- sip-server ACK sip:34020000001320000003@3402000000 SIP/2.0 @@ -738,31 +783,13 @@ void SrsSipStack::req_bye(std::stringstream& ss, SrsSipRequest *req) req->to = to.str(); req->uri = uri.str(); - string to_tag, from_tag, branch; - - if (req->branch.empty()){ - branch = ""; - }else { - branch = ";branch=" + req->branch; - } - - if (req->from_tag.empty()){ - from_tag = ""; - }else { - from_tag = ";tag=" + req->from_tag; - } - - if (req->to_tag.empty()){ - to_tag = ""; - }else { - to_tag = ";tag=" + req->to_tag; - } - int seq = srs_sip_random(22, 99); ss << "BYE " << req->uri << " "<< SRS_SIP_VERSION << SRS_RTSP_CRLF - << "Via: "<< SRS_SIP_VERSION << "/UDP "<< req->host << ":" << req->host_port << ";rport" << branch << SRS_RTSP_CRLF - << "From: from << ">" << from_tag << SRS_RTSP_CRLF - << "To: to << ">" << to_tag << SRS_RTSP_CRLF + //<< "Via: "<< SRS_SIP_VERSION << "/UDP "<< req->host << ":" << req->host_port << ";rport" << branch << SRS_RTSP_CRLF + << "Via: " << SRS_SIP_VERSION << "/UDP " << req->host << ":" << req->host_port << ";rport;branch=" << req->branch << SRS_RTSP_CRLF + << "From: " << get_sip_from(req) << SRS_RTSP_CRLF + << "To: " << get_sip_to(req) << SRS_RTSP_CRLF + //bye callid is inivte callid << "Call-ID: " << req->call_id << SRS_RTSP_CRLF << "CSeq: "<< seq <<" BYE" << SRS_RTSP_CRLF << "Max-Forwards: 70" << SRS_RTSP_CRLF diff --git a/trunk/src/protocol/srs_sip_stack.hpp b/trunk/src/protocol/srs_sip_stack.hpp index 006ecbba3..6e6c47d7c 100644 --- a/trunk/src/protocol/srs_sip_stack.hpp +++ b/trunk/src/protocol/srs_sip_stack.hpp @@ -55,6 +55,8 @@ enum SrsSipCmdType{ SrsSipCmdRespone=1 }; +std::string srs_sip_get_utc_date(); + class SrsSipRequest { public: @@ -130,12 +132,22 @@ public: protected: virtual srs_error_t do_parse_request(SrsSipRequest* req, const char *recv_msg); +private: + //response from + virtual std::string get_sip_from(SrsSipRequest const *req); + //response to + virtual std::string get_sip_to(SrsSipRequest const *req); + //response via + virtual std::string get_sip_via(SrsSipRequest const *req); + public: + //response: request sent by the sip-agent, wait for sip-server response virtual void resp_status(std::stringstream& ss, SrsSipRequest *req); virtual void resp_keepalive(std::stringstream& ss, SrsSipRequest *req); - virtual void resp_ack(std::stringstream& ss, SrsSipRequest *req); - + + //request: request sent by the sip-server, wait for sip-agent response virtual void req_invite(std::stringstream& ss, SrsSipRequest *req, std::string ip, int port, uint32_t ssrc); + virtual void req_ack(std::stringstream& ss, SrsSipRequest *req); virtual void req_bye(std::stringstream& ss, SrsSipRequest *req); virtual void req_401_unauthorized(std::stringstream& ss, SrsSipRequest *req); diff --git a/trunk/src/service/srs_service_http_conn.cpp b/trunk/src/service/srs_service_http_conn.cpp index 42e8a0150..40f40f18f 100644 --- a/trunk/src/service/srs_service_http_conn.cpp +++ b/trunk/src/service/srs_service_http_conn.cpp @@ -120,6 +120,7 @@ srs_error_t SrsHttpParser::parse_message_imp(ISrsReader* reader) if (buffer->size() > 0) { ssize_t consumed = http_parser_execute(&parser, &settings, buffer->bytes(), buffer->size()); + // The error is set in http_errno. enum http_errno code; if ((code = HTTP_PARSER_ERRNO(&parser)) != HPE_OK) { @@ -139,6 +140,8 @@ srs_error_t SrsHttpParser::parse_message_imp(ISrsReader* reader) } } } + + //TODO: fixme 'nparsed' undefined, open info compile error! srs_info("size=%d, nparsed=%d, consumed=%d", buffer->size(), (int)nparsed, consumed); // Only consume the header bytes.