From eed49bf863db42257c7f23615da6a821e55fb207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=82=E3=81=8F?= Date: Thu, 17 Mar 2022 16:44:54 +0700 Subject: [PATCH] [FL-2335] Gui, Rpc: multisession, asynchronous screen streaming with adaptive frame rate (#1033) * Gui,Rpc: multisession, asynchronous screen streaming with adaptive frame rate * Fix compact build, add missing aray initialization. --- applications/gui/gui.c | 48 +++++++++++++------- applications/gui/gui.h | 20 +++++--- applications/gui/gui_i.h | 15 +++++- applications/rpc/rpc.c | 9 +++- applications/rpc/rpc_gui.c | 93 +++++++++++++++++++++++++++++--------- applications/rpc/rpc_i.h | 3 ++ 6 files changed, 139 insertions(+), 49 deletions(-) diff --git a/applications/gui/gui.c b/applications/gui/gui.c index 06733543..69f0e248 100644 --- a/applications/gui/gui.c +++ b/applications/gui/gui.c @@ -211,12 +211,11 @@ void gui_redraw(Gui* gui) { } canvas_commit(gui->canvas); - if(gui->canvas_callback) { - gui->canvas_callback( - canvas_get_buffer(gui->canvas), - canvas_get_buffer_size(gui->canvas), - gui->canvas_callback_context); - } + for + M_EACH(p, gui->canvas_callback_pair, CanvasCallbackPairArray_t) { + p->callback( + canvas_get_buffer(gui->canvas), canvas_get_buffer_size(gui->canvas), p->context); + } gui_unlock(gui); } @@ -396,24 +395,36 @@ void gui_view_port_send_to_back(Gui* gui, ViewPort* view_port) { gui_unlock(gui); } -void gui_set_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context) { +void gui_add_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context) { furi_assert(gui); - gui_lock(gui); - gui->canvas_callback = callback; - gui->canvas_callback_context = context; - gui_unlock(gui); - if(callback != NULL) { - gui_update(gui); - } + const CanvasCallbackPair p = {callback, context}; + + gui_lock(gui); + + furi_assert(CanvasCallbackPairArray_count(gui->canvas_callback_pair, p) == 0); + CanvasCallbackPairArray_push_back(gui->canvas_callback_pair, p); + + gui_unlock(gui); + gui_update(gui); } -GuiCanvasCommitCallback gui_get_framebuffer_callback(Gui* gui) { +void gui_remove_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context) { furi_assert(gui); + + const CanvasCallbackPair p = {callback, context}; + gui_lock(gui); - GuiCanvasCommitCallback callback = gui->canvas_callback; + + furi_assert(CanvasCallbackPairArray_count(gui->canvas_callback_pair, p) == 1); + CanvasCallbackPairArray_remove_val(gui->canvas_callback_pair, p); + gui_unlock(gui); - return callback; +} + +size_t gui_get_framebuffer_size(Gui* gui) { + furi_assert(gui); + return canvas_get_buffer_size(gui->canvas); } void gui_set_lockdown(Gui* gui, bool lockdown) { @@ -437,9 +448,12 @@ Gui* gui_alloc() { } // Drawing canvas gui->canvas = canvas_init(); + CanvasCallbackPairArray_init(gui->canvas_callback_pair); + // Input gui->input_queue = osMessageQueueNew(8, sizeof(InputEvent), NULL); gui->input_events = furi_record_open("input_events"); + furi_check(gui->input_events); furi_pubsub_subscribe(gui->input_events, gui_input_events_callback, gui); diff --git a/applications/gui/gui.h b/applications/gui/gui.h index 39d81c79..e3235242 100644 --- a/applications/gui/gui.h +++ b/applications/gui/gui.h @@ -68,7 +68,7 @@ void gui_view_port_send_to_front(Gui* gui, ViewPort* view_port); */ void gui_view_port_send_to_back(Gui* gui, ViewPort* view_port); -/** Set gui canvas commit callback +/** Add gui canvas commit callback * * This callback will be called upon Canvas commit Callback dispatched from GUI * thread and is time critical @@ -77,16 +77,22 @@ void gui_view_port_send_to_back(Gui* gui, ViewPort* view_port); * @param callback GuiCanvasCommitCallback * @param context GuiCanvasCommitCallback context */ -void gui_set_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context); +void gui_add_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context); -/** Get gui canvas commit callback - * - * Can be used to check if some application is using framebufer +/** Remove gui canvas commit callback * * @param gui Gui instance - * @return GuiCanvasCommitCallback + * @param callback GuiCanvasCommitCallback + * @param context GuiCanvasCommitCallback context */ -GuiCanvasCommitCallback gui_get_framebuffer_callback(Gui* gui); +void gui_remove_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context); + +/** Get gui canvas frame buffer size + * * + * @param gui Gui instance + * @return size_t size of frame buffer in bytes + */ +size_t gui_get_framebuffer_size(Gui* gui); /** Set lockdown mode * diff --git a/applications/gui/gui_i.h b/applications/gui/gui_i.h index 479b5592..ea26339d 100644 --- a/applications/gui/gui_i.h +++ b/applications/gui/gui_i.h @@ -9,6 +9,7 @@ #include #include +#include #include #include "canvas.h" @@ -42,6 +43,17 @@ ARRAY_DEF(ViewPortArray, ViewPort*, M_PTR_OPLIST); +typedef struct { + GuiCanvasCommitCallback callback; + void* context; +} CanvasCallbackPair; + +ARRAY_DEF(CanvasCallbackPairArray, CanvasCallbackPair, M_POD_OPLIST); + +#define M_OPL_CanvasCallbackPairArray_t() ARRAY_OPLIST(CanvasCallbackPairArray, M_POD_OPLIST) + +ALGO_DEF(CanvasCallbackPairArray, CanvasCallbackPairArray_t); + /** Gui structure */ struct Gui { // Thread and lock @@ -52,8 +64,7 @@ struct Gui { bool lockdown; ViewPortArray_t layers[GuiLayerMAX]; Canvas* canvas; - GuiCanvasCommitCallback canvas_callback; - void* canvas_callback_context; + CanvasCallbackPairArray_t canvas_callback_pair; // Input osMessageQueueId_t input_queue; diff --git a/applications/rpc/rpc.c b/applications/rpc/rpc.c index b67d036b..efd5ed87 100644 --- a/applications/rpc/rpc.c +++ b/applications/rpc/rpc.c @@ -623,7 +623,7 @@ RpcSession* rpc_session_open(Rpc* rpc) { rpc_add_handler(session, PB_Main_stop_session_tag, &rpc_handler); session->thread = furi_thread_alloc(); - furi_thread_set_name(session->thread, "RPC Session"); + furi_thread_set_name(session->thread, "RpcSessionWorker"); furi_thread_set_stack_size(session->thread, 2048); furi_thread_set_context(session->thread, session); furi_thread_set_callback(session->thread, rpc_session_worker); @@ -666,9 +666,10 @@ void rpc_add_handler(RpcSession* session, pb_size_t message_tag, RpcHandler* han RpcHandlerDict_set_at(session->handlers, message_tag, *handler); } -void rpc_send_and_release(RpcSession* session, PB_Main* message) { +void rpc_send(RpcSession* session, PB_Main* message) { furi_assert(session); furi_assert(message); + pb_ostream_t ostream = PB_OSTREAM_SIZING; #if SRV_RPC_DEBUG @@ -695,6 +696,10 @@ void rpc_send_and_release(RpcSession* session, PB_Main* message) { osMutexRelease(session->callbacks_mutex); free(buffer); +} + +void rpc_send_and_release(RpcSession* session, PB_Main* message) { + rpc_send(session, message); pb_release(&PB_Main_msg, message); } diff --git a/applications/rpc/rpc_gui.c b/applications/rpc/rpc_gui.c index 1e4b0fe2..3a4e21f0 100644 --- a/applications/rpc/rpc_gui.c +++ b/applications/rpc/rpc_gui.c @@ -5,11 +5,25 @@ #define TAG "RpcGui" +typedef enum { + RpcGuiWorkerFlagTransmit = (1 << 0), + RpcGuiWorkerFlagExit = (1 << 1), +} RpcGuiWorkerFlag; + +#define RpcGuiWorkerFlagAny (RpcGuiWorkerFlagTransmit | RpcGuiWorkerFlagExit) + typedef struct { RpcSession* session; Gui* gui; + + // Receive part ViewPort* virtual_display_view_port; uint8_t* virtual_display_buffer; + + // Transmit + PB_Main* transmit_frame; + FuriThread* transmit_thread; + bool virtual_display_not_empty; bool is_streaming; } RpcGuiSystem; @@ -17,25 +31,35 @@ typedef struct { static void rpc_system_gui_screen_stream_frame_callback(uint8_t* data, size_t size, void* context) { furi_assert(data); - furi_assert(size == 1024); furi_assert(context); RpcGuiSystem* rpc_gui = (RpcGuiSystem*)context; - RpcSession* session = rpc_gui->session; + uint8_t* buffer = rpc_gui->transmit_frame->content.gui_screen_frame.data->bytes; - PB_Main* frame = malloc(sizeof(PB_Main)); + furi_assert(size == rpc_gui->transmit_frame->content.gui_screen_frame.data->size); - frame->which_content = PB_Main_gui_screen_frame_tag; - frame->command_status = PB_CommandStatus_OK; - frame->content.gui_screen_frame.data = malloc(PB_BYTES_ARRAY_T_ALLOCSIZE(size)); - uint8_t* buffer = frame->content.gui_screen_frame.data->bytes; - uint16_t* frame_size_msg = &frame->content.gui_screen_frame.data->size; - *frame_size_msg = size; memcpy(buffer, data, size); - rpc_send_and_release(session, frame); + osThreadFlagsSet( + furi_thread_get_thread_id(rpc_gui->transmit_thread), RpcGuiWorkerFlagTransmit); +} - free(frame); +static int32_t rpc_system_gui_screen_stream_frame_transmit_thread(void* context) { + furi_assert(context); + + RpcGuiSystem* rpc_gui = (RpcGuiSystem*)context; + + while(true) { + uint32_t flags = osThreadFlagsWait(RpcGuiWorkerFlagAny, osFlagsWaitAny, osWaitForever); + if(flags & RpcGuiWorkerFlagTransmit) { + rpc_send(rpc_gui->session, rpc_gui->transmit_frame); + } + if(flags & RpcGuiWorkerFlagExit) { + break; + } + } + + return 0; } static void rpc_system_gui_start_screen_stream_process(const PB_Main* request, void* context) { @@ -45,15 +69,30 @@ static void rpc_system_gui_start_screen_stream_process(const PB_Main* request, v RpcSession* session = rpc_gui->session; furi_assert(session); + furi_assert(!rpc_gui->is_streaming); - if(gui_get_framebuffer_callback(rpc_gui->gui) == NULL) { - rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); - rpc_gui->is_streaming = true; - gui_set_framebuffer_callback( - rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context); - } else { - rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_ERROR_BUSY); - } + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); + + rpc_gui->is_streaming = true; + size_t framebuffer_size = gui_get_framebuffer_size(rpc_gui->gui); + // Reusable Frame + rpc_gui->transmit_frame = malloc(sizeof(PB_Main)); + rpc_gui->transmit_frame->which_content = PB_Main_gui_screen_frame_tag; + rpc_gui->transmit_frame->command_status = PB_CommandStatus_OK; + rpc_gui->transmit_frame->content.gui_screen_frame.data = + malloc(PB_BYTES_ARRAY_T_ALLOCSIZE(framebuffer_size)); + rpc_gui->transmit_frame->content.gui_screen_frame.data->size = framebuffer_size; + // Transmission thread for async TX + rpc_gui->transmit_thread = furi_thread_alloc(); + furi_thread_set_name(rpc_gui->transmit_thread, "GuiRpcWorker"); + furi_thread_set_callback( + rpc_gui->transmit_thread, rpc_system_gui_screen_stream_frame_transmit_thread); + furi_thread_set_context(rpc_gui->transmit_thread, rpc_gui); + furi_thread_set_stack_size(rpc_gui->transmit_thread, 1024); + furi_thread_start(rpc_gui->transmit_thread); + // GUI framebuffer callback + gui_add_framebuffer_callback( + rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context); } static void rpc_system_gui_stop_screen_stream_process(const PB_Main* request, void* context) { @@ -66,7 +105,18 @@ static void rpc_system_gui_stop_screen_stream_process(const PB_Main* request, vo if(rpc_gui->is_streaming) { rpc_gui->is_streaming = false; - gui_set_framebuffer_callback(rpc_gui->gui, NULL, NULL); + // Remove GUI framebuffer callback + gui_remove_framebuffer_callback( + rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context); + // Stop and release worker thread + osThreadFlagsSet( + furi_thread_get_thread_id(rpc_gui->transmit_thread), RpcGuiWorkerFlagExit); + furi_thread_join(rpc_gui->transmit_thread); + furi_thread_free(rpc_gui->transmit_thread); + // Release frame + pb_release(&PB_Main_msg, rpc_gui->transmit_frame); + free(rpc_gui->transmit_frame); + rpc_gui->transmit_frame = NULL; } rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); @@ -296,7 +346,8 @@ void rpc_system_gui_free(void* context) { } if(rpc_gui->is_streaming) { - gui_set_framebuffer_callback(rpc_gui->gui, NULL, NULL); + gui_remove_framebuffer_callback( + rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context); } furi_record_close("gui"); free(rpc_gui); diff --git a/applications/rpc/rpc_i.h b/applications/rpc/rpc_i.h index a8ffff28..f84cc991 100644 --- a/applications/rpc/rpc_i.h +++ b/applications/rpc/rpc_i.h @@ -17,7 +17,10 @@ typedef struct { void* context; } RpcHandler; +void rpc_send(RpcSession* session, PB_Main* main_message); + void rpc_send_and_release(RpcSession* session, PB_Main* main_message); + void rpc_send_and_release_empty(RpcSession* session, uint32_t command_id, PB_CommandStatus status); void rpc_add_handler(RpcSession* session, pb_size_t message_tag, RpcHandler* handler);