1
0
Fork 0
mirror of https://github.com/albfan/miraclecast.git synced 2025-02-15 04:42:06 +00:00

miracle-dispd: improve encoder lifecycle management

Change-Id: Ic651e4795e4c9579978fe3e3529d8d61e739ffd1
This commit is contained in:
Derek Dai 2017-04-21 14:46:10 +08:00
parent c5fb1487e0
commit 875fa86f6b
No known key found for this signature in database
GPG key ID: E109CC97553EF009
5 changed files with 152 additions and 115 deletions

View file

@ -110,25 +110,25 @@ internal class GstEncoder : DispdEncoder, GLib.Object
? configs.get(DispdEncoderConfig.FRAMERATE).get_uint32() ? configs.get(DispdEncoderConfig.FRAMERATE).get_uint32()
: 30; : 30;
StringBuilder desc = new StringBuilder(); StringBuilder desc = new StringBuilder();
desc.append_printf(""" desc.append_printf(
ximagesrc name=vsrc use-damage=false show-pointer=false "ximagesrc name=vsrc use-damage=false show-pointer=false " +
startx=%u starty=%u endx=%u endy=%u "startx=%u starty=%u endx=%u endy=%u " +
! video/x-raw, framerate=%u/1 "! video/x-raw, framerate=%u/1 " +
! videoscale method=0 "! videoscale method=0 " +
! video/x-raw, width=1920, height=1080 "! video/x-raw, width=1920, height=1080 " +
! videoconvert dither=0 "! videoconvert dither=0 " +
! video/x-raw, format=YV12 "! video/x-raw, format=YV12 " +
! x264enc pass=4 b-adapt=false key-int-max=%u speed-preset=4 tune=4 "! x264enc pass=4 b-adapt=false key-int-max=%u speed-preset=4 tune=4 " +
! h264parse "! h264parse " +
! video/x-h264, alignment=nal, stream-format=byte-stream "! video/x-h264, alignment=nal, stream-format=byte-stream " +
%s "%s " +
! mpegtsmux name=muxer "! mpegtsmux name=muxer " +
! rtpmp2tpay "! rtpmp2tpay " +
! .send_rtp_sink_0 rtpbin name=session do-retransmission=true "! .send_rtp_sink_0 rtpbin name=session do-retransmission=true " +
do-sync-event=true do-lost=true ntp-time-source=3 "do-sync-event=true do-lost=true ntp-time-source=3 " +
buffer-mode=0 latency=20 max-misorder-time=30 "buffer-mode=0 latency=20 max-misorder-time=30 " +
! application/x-rtp "! application/x-rtp " +
! udpsink sync=false async=false host="%s" port=%u """, "! udpsink sync=false async=false host=\"%s\" port=%u ",
configs.contains(DispdEncoderConfig.X) configs.contains(DispdEncoderConfig.X)
? configs.get(DispdEncoderConfig.X).get_uint32() ? configs.get(DispdEncoderConfig.X).get_uint32()
: 0, : 0,
@ -293,7 +293,6 @@ internal class GstEncoder : DispdEncoder, GLib.Object
} }
pipeline.set_state(Gst.State.NULL); pipeline.set_state(Gst.State.NULL);
pipeline = null;
} }
public async void prepare() throws Error public async void prepare() throws Error

View file

@ -57,6 +57,7 @@ enum wfd_session_state
WFD_SESSION_STATE_PAUSED, WFD_SESSION_STATE_PAUSED,
WFD_SESSION_STATE_PLAYING, WFD_SESSION_STATE_PLAYING,
WFD_SESSION_STATE_TEARING_DOWN, WFD_SESSION_STATE_TEARING_DOWN,
WFD_SESSION_STATE_TERMINATING,
}; };
struct wfd_rectangle struct wfd_rectangle
@ -93,6 +94,7 @@ int wfd_session_start(struct wfd_session *s);
int wfd_session_resume(struct wfd_session *s); int wfd_session_resume(struct wfd_session *s);
int wfd_session_pause(struct wfd_session *s); int wfd_session_pause(struct wfd_session *s);
int wfd_session_teardown(struct wfd_session *s); int wfd_session_teardown(struct wfd_session *s);
int wfd_session_terminate(struct wfd_session *s);
int wfd_session_is_established(struct wfd_session *s); int wfd_session_is_established(struct wfd_session *s);
unsigned int wfd_session_get_id(struct wfd_session *s); unsigned int wfd_session_get_id(struct wfd_session *s);

View file

@ -39,6 +39,8 @@ struct dispd_encoder
sd_event_source *pipe_source; sd_event_source *pipe_source;
sd_bus *bus; sd_bus *bus;
sd_bus_slot *name_disappeared_slot;
sd_bus_slot *state_change_notify_slot;
char *name; char *name;
@ -87,7 +89,6 @@ static int dispd_encoder_exec(const char *cmd, int fd, struct wfd_session *s)
(char *[]){ (char *) cmd, NULL }, (char *[]){ (char *) cmd, NULL },
(char *[]){ disp, (char *[]){ disp,
auth, auth,
"GST_DEBUG=3",
"G_MESSAGES_DEBUG=all", "G_MESSAGES_DEBUG=all",
NULL NULL
}); });
@ -102,23 +103,30 @@ static void dispd_encoder_close_pipe(struct dispd_encoder *e)
} }
close(sd_event_source_get_io_fd(e->pipe_source)); close(sd_event_source_get_io_fd(e->pipe_source));
sd_event_source_set_enabled(e->pipe_source, false);
sd_event_source_unref(e->pipe_source); sd_event_source_unref(e->pipe_source);
e->pipe_source = NULL; e->pipe_source = NULL;
dispd_encoder_unref(e);
} }
static void dispd_encoder_kill_child(struct dispd_encoder *e) static int dispd_encoder_kill_child(struct dispd_encoder *e)
{ {
int r;
pid_t pid; pid_t pid;
if(!e->child_source) { if(!e->child_source) {
return; return 0;
} }
// TODO add timer in case child can't be terminated by SIGTERM
sd_event_source_get_child_pid(e->child_source, &pid); sd_event_source_get_child_pid(e->child_source, &pid);
kill(pid, SIGKILL); r = kill(pid, SIGTERM);
sd_event_source_set_enabled(e->child_source, false); if(0 > r) {
sd_event_source_unref(e->child_source); return log_ERRNO();
}
return 1;
} }
static void dispd_encoder_notify_state_change(struct dispd_encoder *e, static void dispd_encoder_notify_state_change(struct dispd_encoder *e,
@ -135,6 +143,29 @@ static void dispd_encoder_notify_state_change(struct dispd_encoder *e,
dispd_encoder_unref(e); dispd_encoder_unref(e);
} }
static void dispd_encoder_cleanup(struct dispd_encoder *e)
{
if(e->child_source) {
sd_event_source_unref(e->child_source);
e->child_source = NULL;
dispd_encoder_unref(e);
}
dispd_encoder_close_pipe(e);
if(e->name_disappeared_slot) {
sd_bus_slot_unref(e->name_disappeared_slot);
e->name_disappeared_slot = NULL;
dispd_encoder_unref(e);
}
if(e->state_change_notify_slot) {
sd_bus_slot_unref(e->state_change_notify_slot);
e->state_change_notify_slot = NULL;
dispd_encoder_unref(e);
}
}
static int dispd_encoder_on_terminated(sd_event_source *source, static int dispd_encoder_on_terminated(sd_event_source *source,
const siginfo_t *si, const siginfo_t *si,
void *userdata) void *userdata)
@ -143,9 +174,8 @@ static int dispd_encoder_on_terminated(sd_event_source *source,
log_info("encoder %d terminated", si->si_pid); log_info("encoder %d terminated", si->si_pid);
if(e) { dispd_encoder_set_state(e, DISPD_ENCODER_STATE_TERMINATED);
dispd_encoder_set_state(e, DISPD_ENCODER_STATE_TERMINATED); dispd_encoder_cleanup(e);
}
return 0; return 0;
} }
@ -190,7 +220,7 @@ int dispd_encoder_spawn(struct dispd_encoder **out, struct wfd_session *s)
pid, pid,
WEXITED, WEXITED,
dispd_encoder_on_terminated, dispd_encoder_on_terminated,
e); dispd_encoder_ref(e));
if(0 > r) { if(0 > r) {
goto close_pipe; goto close_pipe;
} }
@ -200,7 +230,7 @@ int dispd_encoder_spawn(struct dispd_encoder **out, struct wfd_session *s)
fds[0], fds[0],
EPOLLIN, EPOLLIN,
dispd_encoder_on_unique_name, dispd_encoder_on_unique_name,
e); dispd_encoder_ref(e));
if(0 > r) { if(0 > r) {
goto close_pipe; goto close_pipe;
} }
@ -265,6 +295,8 @@ void dispd_encoder_unref(struct dispd_encoder *e)
return; return;
} }
/* since we encrease ref count at creation of every sources and slots,
* once we get here, it means no sources and slots exist anymore */
if(e->bus) { if(e->bus) {
sd_bus_unref(e->bus); sd_bus_unref(e->bus);
} }
@ -273,9 +305,6 @@ void dispd_encoder_unref(struct dispd_encoder *e)
free(e->name); free(e->name);
} }
dispd_encoder_close_pipe(e);
dispd_encoder_kill_child(e);
free(e); free(e);
} }
@ -405,10 +434,19 @@ static int on_encoder_disappeared(sd_bus_message *m,
sd_bus_error *ret_error) sd_bus_error *ret_error)
{ {
struct dispd_encoder *e = userdata; struct dispd_encoder *e = userdata;
int r;
log_info("encoder disappered"); log_info("encoder %s disappered", e->name);
dispd_encoder_set_state(e, DISPD_ENCODER_STATE_TERMINATED); r = dispd_encoder_kill_child(e);
if(0 > r) {
return r;
}
else if(r) {
return 0;
}
dispd_encoder_cleanup(e);
return 0; return 0;
} }
@ -459,10 +497,11 @@ static int dispd_encoder_on_unique_name(sd_event_source *source,
"member='PropertiesChanged'," "member='PropertiesChanged',"
"arg0='org.freedesktop.miracle.encoder'", "arg0='org.freedesktop.miracle.encoder'",
e->name); e->name);
r = sd_bus_add_match(e->bus, NULL, r = sd_bus_add_match(e->bus,
buf, &e->state_change_notify_slot,
on_encoder_properties_changed, buf,
e); on_encoder_properties_changed,
dispd_encoder_ref(e));
if(0 > r) { if(0 > r) {
goto error; goto error;
} }
@ -475,11 +514,11 @@ static int dispd_encoder_on_unique_name(sd_event_source *source,
"member='NameOwnerChanged'," "member='NameOwnerChanged',"
"arg0namespace='%s'", "arg0namespace='%s'",
e->name); e->name);
r = sd_bus_add_match(e->bus, NULL, r = sd_bus_add_match(e->bus,
buf, &e->name_disappeared_slot,
on_encoder_disappeared, buf,
e); on_encoder_disappeared,
dispd_encoder_ref(e));
dispd_encoder_set_state(e, DISPD_ENCODER_STATE_SPAWNED); dispd_encoder_set_state(e, DISPD_ENCODER_STATE_SPAWNED);
@ -670,27 +709,34 @@ static int dispd_encoder_call(struct dispd_encoder *e, const char *method)
{ {
_cleanup_sd_bus_message_ sd_bus_message *call = NULL; _cleanup_sd_bus_message_ sd_bus_message *call = NULL;
_cleanup_sd_bus_message_ sd_bus_message *reply = NULL; _cleanup_sd_bus_message_ sd_bus_message *reply = NULL;
sd_bus_error error = { 0 }; _cleanup_sd_bus_error_ sd_bus_error error = SD_BUS_ERROR_NULL;
int r = sd_bus_message_new_method_call(e->bus, int r;
assert(e);
assert(method);
assert(e->bus);
r = sd_bus_message_new_method_call(e->bus,
&call, &call,
e->name, e->name,
"/org/freedesktop/miracle/encoder", "/org/freedesktop/miracle/encoder",
"org.freedesktop.miracle.encoder", "org.freedesktop.miracle.encoder",
method); method);
if(0 > r) { if(0 > r) {
return r; goto error;
} }
r = sd_bus_call(e->bus, call, 0, &error, &reply); r = sd_bus_call(e->bus, call, 0, &error, &reply);
if(0 > r) { if(0 > r) {
log_warning("error invoke method %s: %s, %s", goto error;
method,
error.name,
error.message);
sd_bus_error_free(&error);
} }
return r; return 0;
error:
dispd_encoder_kill_child(e);
return log_ERRNO();
} }
int dispd_encoder_start(struct dispd_encoder *e) int dispd_encoder_start(struct dispd_encoder *e)

View file

@ -243,7 +243,6 @@ int wfd_out_session_teardown(struct wfd_session *s)
void wfd_out_session_destroy(struct wfd_session *s) void wfd_out_session_destroy(struct wfd_session *s)
{ {
// pid_t pid;
struct wfd_out_session *os = wfd_out_session(s); struct wfd_out_session *os = wfd_out_session(s);
if(0 <= os->fd) { if(0 <= os->fd) {
close(os->fd); close(os->fd);
@ -251,6 +250,7 @@ void wfd_out_session_destroy(struct wfd_session *s)
} }
if(os->encoder) { if(os->encoder) {
dispd_encoder_stop(os->encoder);
dispd_encoder_set_handler(os->encoder, NULL, NULL); dispd_encoder_set_handler(os->encoder, NULL, NULL);
dispd_encoder_unref(os->encoder); dispd_encoder_unref(os->encoder);
os->encoder = NULL; os->encoder = NULL;

View file

@ -47,7 +47,6 @@ static const char * rtsp_message_id_to_string(enum rtsp_message_id id);
static int wfd_session_handle_request(struct rtsp *bus, static int wfd_session_handle_request(struct rtsp *bus,
struct rtsp_message *m, struct rtsp_message *m,
void *userdata); void *userdata);
static void wfd_session_hup(struct wfd_session *s);
const struct wfd_session_vtable session_vtbl[] = { const struct wfd_session_vtable session_vtbl[] = {
[WFD_SESSION_DIR_OUT] = { [WFD_SESSION_DIR_OUT] = {
@ -179,7 +178,6 @@ int wfd_session_pause(struct wfd_session *s)
int wfd_session_teardown(struct wfd_session *s) int wfd_session_teardown(struct wfd_session *s)
{ {
log_info("wfd_session_teardown(%p)", s);
assert(wfd_is_session(s)); assert(wfd_is_session(s));
if(wfd_session_is_established(s)) { if(wfd_session_is_established(s)) {
@ -191,38 +189,30 @@ int wfd_session_teardown(struct wfd_session *s)
} }
else { else {
/* notify and detach from sink */ /* notify and detach from sink */
wfd_session_terminate(s);
wfd_fn_out_session_ended(s); wfd_fn_out_session_ended(s);
} }
return 0; return 0;
} }
struct wfd_session * wfd_session_ref(struct wfd_session *s) int wfd_session_terminate(struct wfd_session *s)
{
if(s) {
++ s->ref;
}
return s;
}
void wfd_session_unref(struct wfd_session *s)
{ {
if(!s) { if(!s) {
return; return 0;
}
assert(1 <= s->ref);
-- s->ref;
if(s->ref) {
return;
} }
if(session_vtbl[s->dir].destroy) { if(session_vtbl[s->dir].destroy) {
(*session_vtbl[s->dir].destroy)(s); (*session_vtbl[s->dir].destroy)(s);
} }
if(s->rtsp) {
rtsp_remove_match(s->rtsp, wfd_session_handle_request, s);
rtsp_detach_event(s->rtsp);
rtsp_unref(s->rtsp);
s->rtsp = NULL;
}
if(s->vformats) { if(s->vformats) {
wfd_video_formats_free(s->vformats); wfd_video_formats_free(s->vformats);
s->vformats = NULL; s->vformats = NULL;
@ -253,12 +243,39 @@ void wfd_session_unref(struct wfd_session *s)
s->audio_dev_name = NULL; s->audio_dev_name = NULL;
} }
wfd_session_hup(s);
s->rtp_ports[0] = 0; s->rtp_ports[0] = 0;
s->rtp_ports[1] = 0; s->rtp_ports[1] = 0;
s->last_request = RTSP_M_UNKNOWN; s->last_request = RTSP_M_UNKNOWN;
s->state = WFD_SESSION_STATE_TERMINATING;
return 0;
}
struct wfd_session * wfd_session_ref(struct wfd_session *s)
{
if(s) {
++ s->ref;
}
return s;
}
void wfd_session_unref(struct wfd_session *s)
{
if(!s) {
return;
}
assert(1 <= s->ref);
-- s->ref;
if(s->ref) {
return;
}
wfd_session_terminate(s);
free(s); free(s);
} }
@ -456,11 +473,6 @@ static int wfd_session_handle_request(struct rtsp *bus,
goto error; goto error;
} }
if(WFD_SESSION_STATE_TEARING_DOWN == wfd_session_get_state(s)) {
wfd_session_hup(s);
return 0;
}
r = sd_event_now(ctl_wfd_get_loop(), CLOCK_REALTIME, &usec); r = sd_event_now(ctl_wfd_get_loop(), CLOCK_REALTIME, &usec);
if(0 > r) { if(0 > r) {
goto error; goto error;
@ -498,11 +510,9 @@ static int wfd_session_handle_request(struct rtsp *bus,
return 0; return 0;
error: error:
log_warning("error while handling request: %s", strerror(-r)); wfd_session_terminate(s);
wfd_session_teardown(s);
return r;
return log_ERRNO();
} }
static int wfd_session_handle_reply(struct rtsp *bus, static int wfd_session_handle_reply(struct rtsp *bus,
@ -544,7 +554,7 @@ static int wfd_session_handle_reply(struct rtsp *bus,
error: error:
log_info("error while handling reply: %s", strerror(-r)); log_info("error while handling reply: %s", strerror(-r));
wfd_session_teardown(s); wfd_session_terminate(s);
return r; return r;
} }
@ -621,33 +631,31 @@ static int wfd_session_handle_io(sd_event_source *source,
if (mask & EPOLLERR) { if (mask & EPOLLERR) {
r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
if(0 > r) { if(0 > r) {
goto end; return log_ERRNO();
} }
} }
if (mask & EPOLLIN || mask & EPOLLOUT) { if (mask & EPOLLIN || mask & EPOLLOUT) {
r = (*session_vtbl[s->dir].handle_io)(s, err, &conn); r = (*session_vtbl[s->dir].handle_io)(s, err, &conn);
if(0 > r) { if(0 > r) {
goto end; return log_ERRNO();
} }
r = rtsp_open(&rtsp, conn); r = rtsp_open(&rtsp, conn);
if (0 > r) { if (0 > r) {
goto end; return log_ERRNO();
} }
log_trace("rtsp->ref = %ld", *(unsigned long *) rtsp);
conn = -1; conn = -1;
r = rtsp_attach_event(rtsp, ctl_wfd_get_loop(), 0); r = rtsp_attach_event(rtsp, ctl_wfd_get_loop(), 0);
if (0 > r) { if (0 > r) {
goto end; return log_ERRNO();
} }
r = rtsp_add_match(rtsp, wfd_session_handle_request, s); r = rtsp_add_match(rtsp, wfd_session_handle_request, s);
if (0 > r) { if (0 > r) {
goto end; return log_ERRNO();
} }
s->rtsp = rtsp; s->rtsp = rtsp;
@ -659,28 +667,10 @@ static int wfd_session_handle_io(sd_event_source *source,
} }
if(mask & EPOLLHUP) { if(mask & EPOLLHUP) {
r = -ESHUTDOWN;
}
end:
if (0 > r) {
log_warning("error while handling I/O: %s", strerror(-r));
wfd_session_teardown(s); wfd_session_teardown(s);
} }
return r; return 0;
}
static void wfd_session_hup(struct wfd_session *s)
{
if(!s || !s->rtsp) {
return;
}
rtsp_remove_match(s->rtsp, wfd_session_handle_request, s);
rtsp_detach_event(s->rtsp);
rtsp_unref(s->rtsp);
s->rtsp = NULL;
} }
int wfd_session_start(struct wfd_session *s) int wfd_session_start(struct wfd_session *s)