diff --git a/res/gstencoder.vala b/res/gstencoder.vala index 66cafc6..4d64e98 100644 --- a/res/gstencoder.vala +++ b/res/gstencoder.vala @@ -110,25 +110,25 @@ internal class GstEncoder : DispdEncoder, GLib.Object ? configs.get(DispdEncoderConfig.FRAMERATE).get_uint32() : 30; StringBuilder desc = new StringBuilder(); - desc.append_printf(""" - ximagesrc name=vsrc use-damage=false show-pointer=false - startx=%u starty=%u endx=%u endy=%u - ! video/x-raw, framerate=%u/1 - ! videoscale method=0 - ! video/x-raw, width=1920, height=1080 - ! videoconvert dither=0 - ! video/x-raw, format=YV12 - ! x264enc pass=4 b-adapt=false key-int-max=%u speed-preset=4 tune=4 - ! h264parse - ! video/x-h264, alignment=nal, stream-format=byte-stream - %s - ! mpegtsmux name=muxer - ! rtpmp2tpay - ! .send_rtp_sink_0 rtpbin name=session do-retransmission=true - do-sync-event=true do-lost=true ntp-time-source=3 - buffer-mode=0 latency=20 max-misorder-time=30 - ! application/x-rtp - ! udpsink sync=false async=false host="%s" port=%u """, + desc.append_printf( + "ximagesrc name=vsrc use-damage=false show-pointer=false " + + "startx=%u starty=%u endx=%u endy=%u " + + "! video/x-raw, framerate=%u/1 " + + "! videoscale method=0 " + + "! video/x-raw, width=1920, height=1080 " + + "! videoconvert dither=0 " + + "! video/x-raw, format=YV12 " + + "! x264enc pass=4 b-adapt=false key-int-max=%u speed-preset=4 tune=4 " + + "! h264parse " + + "! video/x-h264, alignment=nal, stream-format=byte-stream " + + "%s " + + "! mpegtsmux name=muxer " + + "! rtpmp2tpay " + + "! .send_rtp_sink_0 rtpbin name=session do-retransmission=true " + + "do-sync-event=true do-lost=true ntp-time-source=3 " + + "buffer-mode=0 latency=20 max-misorder-time=30 " + + "! application/x-rtp " + + "! udpsink sync=false async=false host=\"%s\" port=%u ", configs.contains(DispdEncoderConfig.X) ? configs.get(DispdEncoderConfig.X).get_uint32() : 0, @@ -293,7 +293,6 @@ internal class GstEncoder : DispdEncoder, GLib.Object } pipeline.set_state(Gst.State.NULL); - pipeline = null; } public async void prepare() throws Error diff --git a/src/disp/disp.h b/src/disp/disp.h index 1c1da24..23e7b74 100644 --- a/src/disp/disp.h +++ b/src/disp/disp.h @@ -57,6 +57,7 @@ enum wfd_session_state WFD_SESSION_STATE_PAUSED, WFD_SESSION_STATE_PLAYING, WFD_SESSION_STATE_TEARING_DOWN, + WFD_SESSION_STATE_TERMINATING, }; struct wfd_rectangle @@ -93,6 +94,7 @@ int wfd_session_start(struct wfd_session *s); int wfd_session_resume(struct wfd_session *s); int wfd_session_pause(struct wfd_session *s); int wfd_session_teardown(struct wfd_session *s); +int wfd_session_terminate(struct wfd_session *s); int wfd_session_is_established(struct wfd_session *s); unsigned int wfd_session_get_id(struct wfd_session *s); diff --git a/src/disp/dispd-encoder.c b/src/disp/dispd-encoder.c index 5f7ecad..4bb0e21 100644 --- a/src/disp/dispd-encoder.c +++ b/src/disp/dispd-encoder.c @@ -39,6 +39,8 @@ struct dispd_encoder sd_event_source *pipe_source; sd_bus *bus; + sd_bus_slot *name_disappeared_slot; + sd_bus_slot *state_change_notify_slot; char *name; @@ -87,7 +89,6 @@ static int dispd_encoder_exec(const char *cmd, int fd, struct wfd_session *s) (char *[]){ (char *) cmd, NULL }, (char *[]){ disp, auth, - "GST_DEBUG=3", "G_MESSAGES_DEBUG=all", NULL }); @@ -102,23 +103,30 @@ static void dispd_encoder_close_pipe(struct dispd_encoder *e) } close(sd_event_source_get_io_fd(e->pipe_source)); - sd_event_source_set_enabled(e->pipe_source, false); + sd_event_source_unref(e->pipe_source); e->pipe_source = NULL; + + dispd_encoder_unref(e); } -static void dispd_encoder_kill_child(struct dispd_encoder *e) +static int dispd_encoder_kill_child(struct dispd_encoder *e) { + int r; pid_t pid; if(!e->child_source) { - return; + return 0; } + // TODO add timer in case child can't be terminated by SIGTERM sd_event_source_get_child_pid(e->child_source, &pid); - kill(pid, SIGKILL); - sd_event_source_set_enabled(e->child_source, false); - sd_event_source_unref(e->child_source); + r = kill(pid, SIGTERM); + if(0 > r) { + return log_ERRNO(); + } + + return 1; } static void dispd_encoder_notify_state_change(struct dispd_encoder *e, @@ -135,6 +143,29 @@ static void dispd_encoder_notify_state_change(struct dispd_encoder *e, dispd_encoder_unref(e); } +static void dispd_encoder_cleanup(struct dispd_encoder *e) +{ + if(e->child_source) { + sd_event_source_unref(e->child_source); + e->child_source = NULL; + dispd_encoder_unref(e); + } + + dispd_encoder_close_pipe(e); + + if(e->name_disappeared_slot) { + sd_bus_slot_unref(e->name_disappeared_slot); + e->name_disappeared_slot = NULL; + dispd_encoder_unref(e); + } + + if(e->state_change_notify_slot) { + sd_bus_slot_unref(e->state_change_notify_slot); + e->state_change_notify_slot = NULL; + dispd_encoder_unref(e); + } +} + static int dispd_encoder_on_terminated(sd_event_source *source, const siginfo_t *si, void *userdata) @@ -143,9 +174,8 @@ static int dispd_encoder_on_terminated(sd_event_source *source, log_info("encoder %d terminated", si->si_pid); - if(e) { - dispd_encoder_set_state(e, DISPD_ENCODER_STATE_TERMINATED); - } + dispd_encoder_set_state(e, DISPD_ENCODER_STATE_TERMINATED); + dispd_encoder_cleanup(e); return 0; } @@ -190,7 +220,7 @@ int dispd_encoder_spawn(struct dispd_encoder **out, struct wfd_session *s) pid, WEXITED, dispd_encoder_on_terminated, - e); + dispd_encoder_ref(e)); if(0 > r) { goto close_pipe; } @@ -200,7 +230,7 @@ int dispd_encoder_spawn(struct dispd_encoder **out, struct wfd_session *s) fds[0], EPOLLIN, dispd_encoder_on_unique_name, - e); + dispd_encoder_ref(e)); if(0 > r) { goto close_pipe; } @@ -265,6 +295,8 @@ void dispd_encoder_unref(struct dispd_encoder *e) return; } + /* since we encrease ref count at creation of every sources and slots, + * once we get here, it means no sources and slots exist anymore */ if(e->bus) { sd_bus_unref(e->bus); } @@ -273,9 +305,6 @@ void dispd_encoder_unref(struct dispd_encoder *e) free(e->name); } - dispd_encoder_close_pipe(e); - dispd_encoder_kill_child(e); - free(e); } @@ -405,10 +434,19 @@ static int on_encoder_disappeared(sd_bus_message *m, sd_bus_error *ret_error) { struct dispd_encoder *e = userdata; + int r; - log_info("encoder disappered"); - - dispd_encoder_set_state(e, DISPD_ENCODER_STATE_TERMINATED); + log_info("encoder %s disappered", e->name); + + r = dispd_encoder_kill_child(e); + if(0 > r) { + return r; + } + else if(r) { + return 0; + } + + dispd_encoder_cleanup(e); return 0; } @@ -459,10 +497,11 @@ static int dispd_encoder_on_unique_name(sd_event_source *source, "member='PropertiesChanged'," "arg0='org.freedesktop.miracle.encoder'", e->name); - r = sd_bus_add_match(e->bus, NULL, - buf, - on_encoder_properties_changed, - e); + r = sd_bus_add_match(e->bus, + &e->state_change_notify_slot, + buf, + on_encoder_properties_changed, + dispd_encoder_ref(e)); if(0 > r) { goto error; } @@ -475,11 +514,11 @@ static int dispd_encoder_on_unique_name(sd_event_source *source, "member='NameOwnerChanged'," "arg0namespace='%s'", e->name); - r = sd_bus_add_match(e->bus, NULL, - buf, - on_encoder_disappeared, - e); - + r = sd_bus_add_match(e->bus, + &e->name_disappeared_slot, + buf, + on_encoder_disappeared, + dispd_encoder_ref(e)); dispd_encoder_set_state(e, DISPD_ENCODER_STATE_SPAWNED); @@ -670,27 +709,34 @@ static int dispd_encoder_call(struct dispd_encoder *e, const char *method) { _cleanup_sd_bus_message_ sd_bus_message *call = NULL; _cleanup_sd_bus_message_ sd_bus_message *reply = NULL; - sd_bus_error error = { 0 }; - int r = sd_bus_message_new_method_call(e->bus, + _cleanup_sd_bus_error_ sd_bus_error error = SD_BUS_ERROR_NULL; + int r; + + assert(e); + assert(method); + assert(e->bus); + + r = sd_bus_message_new_method_call(e->bus, &call, e->name, "/org/freedesktop/miracle/encoder", "org.freedesktop.miracle.encoder", method); if(0 > r) { - return r; + goto error; } r = sd_bus_call(e->bus, call, 0, &error, &reply); if(0 > r) { - log_warning("error invoke method %s: %s, %s", - method, - error.name, - error.message); - sd_bus_error_free(&error); + goto error; } - return r; + return 0; + +error: + dispd_encoder_kill_child(e); + + return log_ERRNO(); } int dispd_encoder_start(struct dispd_encoder *e) diff --git a/src/disp/wfd-out-session.c b/src/disp/wfd-out-session.c index 4bead00..01e19cd 100644 --- a/src/disp/wfd-out-session.c +++ b/src/disp/wfd-out-session.c @@ -243,7 +243,6 @@ int wfd_out_session_teardown(struct wfd_session *s) void wfd_out_session_destroy(struct wfd_session *s) { -// pid_t pid; struct wfd_out_session *os = wfd_out_session(s); if(0 <= os->fd) { close(os->fd); @@ -251,6 +250,7 @@ void wfd_out_session_destroy(struct wfd_session *s) } if(os->encoder) { + dispd_encoder_stop(os->encoder); dispd_encoder_set_handler(os->encoder, NULL, NULL); dispd_encoder_unref(os->encoder); os->encoder = NULL; diff --git a/src/disp/wfd-session.c b/src/disp/wfd-session.c index 6217dd1..6494ae5 100644 --- a/src/disp/wfd-session.c +++ b/src/disp/wfd-session.c @@ -47,7 +47,6 @@ static const char * rtsp_message_id_to_string(enum rtsp_message_id id); static int wfd_session_handle_request(struct rtsp *bus, struct rtsp_message *m, void *userdata); -static void wfd_session_hup(struct wfd_session *s); const struct wfd_session_vtable session_vtbl[] = { [WFD_SESSION_DIR_OUT] = { @@ -179,7 +178,6 @@ int wfd_session_pause(struct wfd_session *s) int wfd_session_teardown(struct wfd_session *s) { - log_info("wfd_session_teardown(%p)", s); assert(wfd_is_session(s)); if(wfd_session_is_established(s)) { @@ -191,38 +189,30 @@ int wfd_session_teardown(struct wfd_session *s) } else { /* notify and detach from sink */ + wfd_session_terminate(s); wfd_fn_out_session_ended(s); } return 0; } -struct wfd_session * wfd_session_ref(struct wfd_session *s) -{ - if(s) { - ++ s->ref; - } - - return s; -} - -void wfd_session_unref(struct wfd_session *s) +int wfd_session_terminate(struct wfd_session *s) { if(!s) { - return; - } - - assert(1 <= s->ref); - - -- s->ref; - if(s->ref) { - return; + return 0; } if(session_vtbl[s->dir].destroy) { (*session_vtbl[s->dir].destroy)(s); } + if(s->rtsp) { + rtsp_remove_match(s->rtsp, wfd_session_handle_request, s); + rtsp_detach_event(s->rtsp); + rtsp_unref(s->rtsp); + s->rtsp = NULL; + } + if(s->vformats) { wfd_video_formats_free(s->vformats); s->vformats = NULL; @@ -253,12 +243,39 @@ void wfd_session_unref(struct wfd_session *s) s->audio_dev_name = NULL; } - wfd_session_hup(s); - s->rtp_ports[0] = 0; s->rtp_ports[1] = 0; s->last_request = RTSP_M_UNKNOWN; + s->state = WFD_SESSION_STATE_TERMINATING; + + return 0; +} + +struct wfd_session * wfd_session_ref(struct wfd_session *s) +{ + if(s) { + ++ s->ref; + } + + return s; +} + +void wfd_session_unref(struct wfd_session *s) +{ + if(!s) { + return; + } + + assert(1 <= s->ref); + + -- s->ref; + if(s->ref) { + return; + } + + wfd_session_terminate(s); + free(s); } @@ -456,11 +473,6 @@ static int wfd_session_handle_request(struct rtsp *bus, goto error; } - if(WFD_SESSION_STATE_TEARING_DOWN == wfd_session_get_state(s)) { - wfd_session_hup(s); - return 0; - } - r = sd_event_now(ctl_wfd_get_loop(), CLOCK_REALTIME, &usec); if(0 > r) { goto error; @@ -498,11 +510,9 @@ static int wfd_session_handle_request(struct rtsp *bus, return 0; error: - log_warning("error while handling request: %s", strerror(-r)); - wfd_session_teardown(s); - - return r; + wfd_session_terminate(s); + return log_ERRNO(); } static int wfd_session_handle_reply(struct rtsp *bus, @@ -544,7 +554,7 @@ static int wfd_session_handle_reply(struct rtsp *bus, error: log_info("error while handling reply: %s", strerror(-r)); - wfd_session_teardown(s); + wfd_session_terminate(s); return r; } @@ -621,33 +631,31 @@ static int wfd_session_handle_io(sd_event_source *source, if (mask & EPOLLERR) { r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); if(0 > r) { - goto end; + return log_ERRNO(); } } if (mask & EPOLLIN || mask & EPOLLOUT) { r = (*session_vtbl[s->dir].handle_io)(s, err, &conn); if(0 > r) { - goto end; + return log_ERRNO(); } r = rtsp_open(&rtsp, conn); if (0 > r) { - goto end; + return log_ERRNO(); } - log_trace("rtsp->ref = %ld", *(unsigned long *) rtsp); - conn = -1; r = rtsp_attach_event(rtsp, ctl_wfd_get_loop(), 0); if (0 > r) { - goto end; + return log_ERRNO(); } r = rtsp_add_match(rtsp, wfd_session_handle_request, s); if (0 > r) { - goto end; + return log_ERRNO(); } s->rtsp = rtsp; @@ -659,28 +667,10 @@ static int wfd_session_handle_io(sd_event_source *source, } if(mask & EPOLLHUP) { - r = -ESHUTDOWN; - } - -end: - if (0 > r) { - log_warning("error while handling I/O: %s", strerror(-r)); wfd_session_teardown(s); } - return r; -} - -static void wfd_session_hup(struct wfd_session *s) -{ - if(!s || !s->rtsp) { - return; - } - - rtsp_remove_match(s->rtsp, wfd_session_handle_request, s); - rtsp_detach_event(s->rtsp); - rtsp_unref(s->rtsp); - s->rtsp = NULL; + return 0; } int wfd_session_start(struct wfd_session *s)