From 76b737f411c82f3fc962cba15cbf974dc69fe4c4 Mon Sep 17 00:00:00 2001 From: Nikolay Minaylov Date: Wed, 16 Mar 2022 11:52:11 +0300 Subject: [PATCH] [FL-2257] RPC Refactoring (#1021) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * rpc: support for multiple sessions * removed debug prints * code formatting fix * compact build fix Co-authored-by: あく --- applications/gui/gui.c | 8 + applications/gui/gui.h | 9 + applications/rpc/rpc.c | 502 +++++++++++++++--------------- applications/rpc/rpc_app.c | 23 +- applications/rpc/rpc_cli.c | 2 +- applications/rpc/rpc_gui.c | 88 ++++-- applications/rpc/rpc_i.h | 21 +- applications/rpc/rpc_storage.c | 130 +++++--- applications/rpc/rpc_system.c | 114 +++---- applications/tests/rpc/rpc_test.c | 291 ++++++++++++----- 10 files changed, 712 insertions(+), 476 deletions(-) diff --git a/applications/gui/gui.c b/applications/gui/gui.c index 7d34f5e2..06733543 100644 --- a/applications/gui/gui.c +++ b/applications/gui/gui.c @@ -408,6 +408,14 @@ void gui_set_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, vo } } +GuiCanvasCommitCallback gui_get_framebuffer_callback(Gui* gui) { + furi_assert(gui); + gui_lock(gui); + GuiCanvasCommitCallback callback = gui->canvas_callback; + gui_unlock(gui); + return callback; +} + void gui_set_lockdown(Gui* gui, bool lockdown) { furi_assert(gui); gui_lock(gui); diff --git a/applications/gui/gui.h b/applications/gui/gui.h index 5c209362..39d81c79 100644 --- a/applications/gui/gui.h +++ b/applications/gui/gui.h @@ -79,6 +79,15 @@ void gui_view_port_send_to_back(Gui* gui, ViewPort* view_port); */ void gui_set_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context); +/** Get gui canvas commit callback + * + * Can be used to check if some application is using framebufer + * + * @param gui Gui instance + * @return GuiCanvasCommitCallback + */ +GuiCanvasCommitCallback gui_get_framebuffer_callback(Gui* gui); + /** Set lockdown mode * * When lockdown mode is enabled, only GuiLayerDesktop is shown. diff --git a/applications/rpc/rpc.c b/applications/rpc/rpc.c index 7bdd9ab5..b67d036b 100644 --- a/applications/rpc/rpc.c +++ b/applications/rpc/rpc.c @@ -19,9 +19,12 @@ #define TAG "RpcSrv" -#define RPC_EVENT_NEW_DATA (1 << 0) -#define RPC_EVENT_DISCONNECT (1 << 1) -#define RPC_EVENTS_ALL (RPC_EVENT_DISCONNECT | RPC_EVENT_NEW_DATA) +typedef enum { + RpcEvtNewData = (1 << 0), + RpcEvtDisconnect = (1 << 1), +} RpcEvtFlags; + +#define RPC_ALL_EVENTS (RpcEvtNewData | RpcEvtDisconnect) DICT_DEF2(RpcHandlerDict, pb_size_t, M_DEFAULT_OPLIST, RpcHandler, M_POD_OPLIST) @@ -51,44 +54,45 @@ static RpcSystemCallbacks rpc_systems[] = { }; struct RpcSession { + Rpc* rpc; + + FuriThread* thread; + + RpcHandlerDict_t handlers; + StreamBufferHandle_t stream; + PB_Main* decoded_message; + bool terminate; + void** system_contexts; + bool decode_error; + + osMutexId_t callbacks_mutex; RpcSendBytesCallback send_bytes_callback; RpcBufferIsEmptyCallback buffer_is_empty_callback; RpcSessionClosedCallback closed_callback; RpcSessionTerminatedCallback terminated_callback; void* context; - osMutexId_t callbacks_mutex; - Rpc* rpc; - bool terminate; - void** system_contexts; - bool decode_error; }; struct Rpc { - bool busy; osMutexId_t busy_mutex; - RpcSession session; - osEventFlagsId_t events; - StreamBufferHandle_t stream; - RpcHandlerDict_t handlers; - PB_Main* decoded_message; }; static bool content_callback(pb_istream_t* stream, const pb_field_t* field, void** arg); -static void rpc_close_session_process(const PB_Main* msg_request, void* context) { - furi_assert(msg_request); - furi_assert(context); +static void rpc_close_session_process(const PB_Main* request, void* context) { + furi_assert(request); - Rpc* rpc = context; + RpcSession* session = (RpcSession*)context; + furi_assert(session); - rpc_send_and_release_empty(rpc, msg_request->command_id, PB_CommandStatus_OK); - osMutexAcquire(rpc->session.callbacks_mutex, osWaitForever); - if(rpc->session.closed_callback) { - rpc->session.closed_callback(rpc->session.context); + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); + osMutexAcquire(session->callbacks_mutex, osWaitForever); + if(session->closed_callback) { + session->closed_callback(session->context); } else { - FURI_LOG_W(TAG, "Session stop doesn't processed by transport layer"); + FURI_LOG_W(TAG, "Session stop isn't processed by transport layer"); } - osMutexRelease(rpc->session.callbacks_mutex); + osMutexRelease(session->callbacks_mutex); } static size_t rpc_sprintf_msg_file( @@ -349,94 +353,8 @@ void rpc_print_message(const PB_Main* message) { string_clear(str); } -static Rpc* rpc_alloc(void) { - Rpc* rpc = malloc(sizeof(Rpc)); - rpc->busy_mutex = osMutexNew(NULL); - rpc->busy = false; - rpc->events = osEventFlagsNew(NULL); - rpc->stream = xStreamBufferCreate(RPC_BUFFER_SIZE, 1); - - rpc->decoded_message = malloc(sizeof(PB_Main)); - rpc->decoded_message->cb_content.funcs.decode = content_callback; - rpc->decoded_message->cb_content.arg = rpc; - - RpcHandlerDict_init(rpc->handlers); - - return rpc; -} - -RpcSession* rpc_session_open(Rpc* rpc) { - furi_assert(rpc); - bool result = false; - furi_check(osMutexAcquire(rpc->busy_mutex, osWaitForever) == osOK); - if(rpc->busy) { - result = false; - } else { - rpc->busy = true; - result = true; - } - furi_check(osMutexRelease(rpc->busy_mutex) == osOK); - - if(result) { - RpcSession* session = &rpc->session; - session->callbacks_mutex = osMutexNew(NULL); - session->rpc = rpc; - session->terminate = false; - session->decode_error = false; - xStreamBufferReset(rpc->stream); - - session->system_contexts = malloc(COUNT_OF(rpc_systems) * sizeof(void*)); - for(int i = 0; i < COUNT_OF(rpc_systems); ++i) { - session->system_contexts[i] = rpc_systems[i].alloc(rpc); - } - - RpcHandler rpc_handler = { - .message_handler = rpc_close_session_process, - .decode_submessage = NULL, - .context = rpc, - }; - rpc_add_handler(rpc, PB_Main_stop_session_tag, &rpc_handler); - - FURI_LOG_D(TAG, "Session started"); - } - - return result ? &rpc->session : NULL; /* support 1 open session for now */ -} - -void rpc_session_close(RpcSession* session) { - furi_assert(session); - furi_assert(session->rpc); - furi_assert(session->rpc->busy); - - rpc_session_set_send_bytes_callback(session, NULL); - rpc_session_set_close_callback(session, NULL); - rpc_session_set_buffer_is_empty_callback(session, NULL); - osEventFlagsSet(session->rpc->events, RPC_EVENT_DISCONNECT); -} - -static void rpc_free_session(RpcSession* session) { - furi_assert(session); - - for(int i = 0; i < COUNT_OF(rpc_systems); ++i) { - if(rpc_systems[i].free) { - rpc_systems[i].free(session->system_contexts[i]); - } - } - free(session->system_contexts); - osMutexDelete(session->callbacks_mutex); - RpcHandlerDict_reset(session->rpc->handlers); - - session->context = NULL; - session->closed_callback = NULL; - session->send_bytes_callback = NULL; - session->buffer_is_empty_callback = NULL; - session->terminated_callback = NULL; -} - void rpc_session_set_context(RpcSession* session, void* context) { furi_assert(session); - furi_assert(session->rpc); - furi_assert(session->rpc->busy); osMutexAcquire(session->callbacks_mutex, osWaitForever); session->context = context; @@ -445,8 +363,6 @@ void rpc_session_set_context(RpcSession* session, void* context) { void rpc_session_set_close_callback(RpcSession* session, RpcSessionClosedCallback callback) { furi_assert(session); - furi_assert(session->rpc); - furi_assert(session->rpc->busy); osMutexAcquire(session->callbacks_mutex, osWaitForever); session->closed_callback = callback; @@ -455,8 +371,6 @@ void rpc_session_set_close_callback(RpcSession* session, RpcSessionClosedCallbac void rpc_session_set_send_bytes_callback(RpcSession* session, RpcSendBytesCallback callback) { furi_assert(session); - furi_assert(session->rpc); - furi_assert(session->rpc->busy); osMutexAcquire(session->callbacks_mutex, osWaitForever); session->send_bytes_callback = callback; @@ -467,7 +381,6 @@ void rpc_session_set_buffer_is_empty_callback( RpcSession* session, RpcBufferIsEmptyCallback callback) { furi_assert(session); - furi_assert(session->rpc->busy); osMutexAcquire(session->callbacks_mutex, osWaitForever); session->buffer_is_empty_callback = callback; @@ -478,8 +391,6 @@ void rpc_session_set_terminated_callback( RpcSession* session, RpcSessionTerminatedCallback callback) { furi_assert(session); - furi_assert(session->rpc); - furi_assert(session->rpc->busy); osMutexAcquire(session->callbacks_mutex, osWaitForever); session->terminated_callback = callback; @@ -495,54 +406,54 @@ void rpc_session_set_terminated_callback( size_t rpc_session_feed(RpcSession* session, uint8_t* encoded_bytes, size_t size, TickType_t timeout) { furi_assert(session); - Rpc* rpc = session->rpc; - furi_assert(rpc->busy); + size_t bytes_sent = xStreamBufferSend(session->stream, encoded_bytes, size, timeout); - size_t bytes_sent = xStreamBufferSend(rpc->stream, encoded_bytes, size, timeout); - osEventFlagsSet(rpc->events, RPC_EVENT_NEW_DATA); + osThreadFlagsSet(furi_thread_get_thread_id(session->thread), RpcEvtNewData); return bytes_sent; } size_t rpc_session_get_available_size(RpcSession* session) { furi_assert(session); - Rpc* rpc = session->rpc; - return xStreamBufferSpacesAvailable(rpc->stream); + return xStreamBufferSpacesAvailable(session->stream); } bool rpc_pb_stream_read(pb_istream_t* istream, pb_byte_t* buf, size_t count) { - Rpc* rpc = istream->state; + RpcSession* session = istream->state; + furi_assert(session); + furi_assert(istream->bytes_left); + uint32_t flags = 0; size_t bytes_received = 0; - furi_assert(istream->bytes_left); - while(1) { bytes_received += - xStreamBufferReceive(rpc->stream, buf + bytes_received, count - bytes_received, 0); - if(xStreamBufferIsEmpty(rpc->stream)) { - if(rpc->session.buffer_is_empty_callback) { - rpc->session.buffer_is_empty_callback(rpc->session.context); + xStreamBufferReceive(session->stream, buf + bytes_received, count - bytes_received, 0); + if(xStreamBufferIsEmpty(session->stream)) { + if(session->buffer_is_empty_callback) { + session->buffer_is_empty_callback(session->context); } } - if(rpc->session.decode_error) { + if(session->decode_error) { /* never go out till RPC_EVENT_DISCONNECT come */ bytes_received = 0; } if(count == bytes_received) { break; } else { - flags = osEventFlagsWait(rpc->events, RPC_EVENTS_ALL, 0, osWaitForever); - if(flags & RPC_EVENT_DISCONNECT) { - if(xStreamBufferIsEmpty(rpc->stream)) { - rpc->session.terminate = true; + flags = osThreadFlagsWait(RPC_ALL_EVENTS, osFlagsWaitAny, osWaitForever); + if(flags & RpcEvtDisconnect) { + if(xStreamBufferIsEmpty(session->stream)) { + session->terminate = true; istream->bytes_left = 0; bytes_received = 0; break; } else { /* Save disconnect flag and continue reading buffer */ - osEventFlagsSet(rpc->events, RPC_EVENT_DISCONNECT); + osThreadFlagsSet(furi_thread_get_thread_id(session->thread), RpcEvtDisconnect); } + } else if(flags & RpcEvtNewData) { + // Just wake thread up } } } @@ -554,10 +465,210 @@ bool rpc_pb_stream_read(pb_istream_t* istream, pb_byte_t* buf, size_t count) { return (count == bytes_received); } -void rpc_send_and_release(Rpc* rpc, PB_Main* message) { +static bool content_callback(pb_istream_t* stream, const pb_field_t* field, void** arg) { + furi_assert(stream); + RpcSession* session = stream->state; + furi_assert(session); + + RpcHandler* handler = RpcHandlerDict_get(session->handlers, field->tag); + + if(handler && handler->decode_submessage) { + handler->decode_submessage(stream, field, arg); + } + + return true; +} + +static int32_t rpc_session_worker(void* context) { + furi_assert(context); + RpcSession* session = (RpcSession*)context; + Rpc* rpc = session->rpc; + + FURI_LOG_D(TAG, "Session started"); + + while(1) { + pb_istream_t istream = { + .callback = rpc_pb_stream_read, + .state = session, + .errmsg = NULL, + .bytes_left = RPC_MAX_MESSAGE_SIZE, /* max incoming message size */ + }; + + bool message_decode_failed = false; + + if(pb_decode_ex(&istream, &PB_Main_msg, session->decoded_message, PB_DECODE_DELIMITED)) { +#if SRV_RPC_DEBUG + FURI_LOG_I(TAG, "INPUT:"); + rpc_print_message(session->decoded_message); +#endif + RpcHandler* handler = + RpcHandlerDict_get(session->handlers, session->decoded_message->which_content); + + if(handler && handler->message_handler) { + furi_check(osMutexAcquire(rpc->busy_mutex, osWaitForever) == osOK); + handler->message_handler(session->decoded_message, handler->context); + furi_check(osMutexRelease(rpc->busy_mutex) == osOK); + } else if(session->decoded_message->which_content == 0) { + /* Receiving zeroes means message is 0-length, which + * is valid for proto3: all fields are filled with default values. + * 0 - is default value for which_content field. + * Mark it as decode error, because there is no content message + * in Main message with tag 0. + */ + message_decode_failed = true; + } else if(!handler && !session->terminate) { + FURI_LOG_E( + TAG, + "Message(%d) decoded, but not implemented", + session->decoded_message->which_content); + rpc_send_and_release_empty( + session, + session->decoded_message->command_id, + PB_CommandStatus_ERROR_NOT_IMPLEMENTED); + } + } else { + message_decode_failed = true; + } + + if(message_decode_failed) { + xStreamBufferReset(session->stream); + if(!session->terminate) { + /* Protobuf can't determine start and end of message. + * Handle this by adding varint at beginning + * of a message (PB_ENCODE_DELIMITED). But decoding fail + * means we can't be sure next bytes are varint for next + * message, so the only way to close session. + * RPC itself can't make decision to close session. It has + * to notify: + * 1) down layer (transport) + * 2) other side (companion app) + * Who are responsible to handle RPC session lifecycle. + * Companion receives 2 messages: ERROR_DECODE and session_closed. + */ + FURI_LOG_E(TAG, "Decode failed, error: \'%.128s\'", PB_GET_ERROR(&istream)); + session->decode_error = true; + rpc_send_and_release_empty(session, 0, PB_CommandStatus_ERROR_DECODE); + osMutexAcquire(session->callbacks_mutex, osWaitForever); + if(session->closed_callback) { + session->closed_callback(session->context); + } + osMutexRelease(session->callbacks_mutex); + } + } + + pb_release(&PB_Main_msg, session->decoded_message); + + if(session->terminate) { + FURI_LOG_D(TAG, "Session terminated"); + break; + } + } + + return 0; +} + +static void rpc_session_free_callback(FuriThreadState thread_state, void* context) { + furi_assert(context); + + RpcSession* session = (RpcSession*)context; + + if(thread_state == FuriThreadStateStopped) { + for(int i = 0; i < COUNT_OF(rpc_systems); ++i) { + if(rpc_systems[i].free) { + rpc_systems[i].free(session->system_contexts[i]); + } + } + free(session->system_contexts); + free(session->decoded_message); + RpcHandlerDict_clear(session->handlers); + vStreamBufferDelete(session->stream); + + osMutexAcquire(session->callbacks_mutex, osWaitForever); + if(session->terminated_callback) { + session->terminated_callback(session->context); + } + osMutexRelease(session->callbacks_mutex); + + osMutexDelete(session->callbacks_mutex); + furi_thread_free(session->thread); + free(session); + } +} + +RpcSession* rpc_session_open(Rpc* rpc) { furi_assert(rpc); + + RpcSession* session = malloc(sizeof(RpcSession)); + session->callbacks_mutex = osMutexNew(NULL); + session->stream = xStreamBufferCreate(RPC_BUFFER_SIZE, 1); + session->rpc = rpc; + session->terminate = false; + session->decode_error = false; + RpcHandlerDict_init(session->handlers); + + session->decoded_message = malloc(sizeof(PB_Main)); + session->decoded_message->cb_content.funcs.decode = content_callback; + session->decoded_message->cb_content.arg = session; + + session->system_contexts = malloc(COUNT_OF(rpc_systems) * sizeof(void*)); + for(int i = 0; i < COUNT_OF(rpc_systems); ++i) { + session->system_contexts[i] = rpc_systems[i].alloc(session); + } + + RpcHandler rpc_handler = { + .message_handler = rpc_close_session_process, + .decode_submessage = NULL, + .context = session, + }; + rpc_add_handler(session, PB_Main_stop_session_tag, &rpc_handler); + + session->thread = furi_thread_alloc(); + furi_thread_set_name(session->thread, "RPC Session"); + furi_thread_set_stack_size(session->thread, 2048); + furi_thread_set_context(session->thread, session); + furi_thread_set_callback(session->thread, rpc_session_worker); + + furi_thread_set_state_context(session->thread, session); + furi_thread_set_state_callback(session->thread, rpc_session_free_callback); + + furi_thread_start(session->thread); + + return session; +} + +void rpc_session_close(RpcSession* session) { + furi_assert(session); + furi_assert(session->rpc); + + rpc_session_set_send_bytes_callback(session, NULL); + rpc_session_set_close_callback(session, NULL); + rpc_session_set_buffer_is_empty_callback(session, NULL); + osThreadFlagsSet(furi_thread_get_thread_id(session->thread), RpcEvtDisconnect); +} + +int32_t rpc_srv(void* p) { + Rpc* rpc = malloc(sizeof(Rpc)); + + rpc->busy_mutex = osMutexNew(NULL); + + Cli* cli = furi_record_open("cli"); + cli_add_command( + cli, "start_rpc_session", CliCommandFlagParallelSafe, rpc_cli_command_start_session, rpc); + + furi_record_create("rpc", rpc); + + return 0; +} + +void rpc_add_handler(RpcSession* session, pb_size_t message_tag, RpcHandler* handler) { + furi_assert(RpcHandlerDict_get(session->handlers, message_tag) == NULL); + + RpcHandlerDict_set_at(session->handlers, message_tag, *handler); +} + +void rpc_send_and_release(RpcSession* session, PB_Main* message) { + furi_assert(session); furi_assert(message); - RpcSession* session = &rpc->session; pb_ostream_t ostream = PB_OSTREAM_SIZING; #if SRV_RPC_DEBUG @@ -587,124 +698,13 @@ void rpc_send_and_release(Rpc* rpc, PB_Main* message) { pb_release(&PB_Main_msg, message); } -static bool content_callback(pb_istream_t* stream, const pb_field_t* field, void** arg) { - furi_assert(stream); - Rpc* rpc = *arg; - - RpcHandler* handler = RpcHandlerDict_get(rpc->handlers, field->tag); - - if(handler && handler->decode_submessage) { - handler->decode_submessage(stream, field, arg); - } - - return true; -} - -int32_t rpc_srv(void* p) { - Rpc* rpc = rpc_alloc(); - furi_record_create("rpc", rpc); - - Cli* cli = furi_record_open("cli"); - - cli_add_command( - cli, "start_rpc_session", CliCommandFlagParallelSafe, rpc_cli_command_start_session, rpc); - - while(1) { - pb_istream_t istream = { - .callback = rpc_pb_stream_read, - .state = rpc, - .errmsg = NULL, - .bytes_left = RPC_MAX_MESSAGE_SIZE, /* max incoming message size */ - }; - - bool message_decode_failed = false; - - if(pb_decode_ex(&istream, &PB_Main_msg, rpc->decoded_message, PB_DECODE_DELIMITED)) { -#if SRV_RPC_DEBUG - FURI_LOG_I(TAG, "INPUT:"); - rpc_print_message(rpc->decoded_message); -#endif - RpcHandler* handler = - RpcHandlerDict_get(rpc->handlers, rpc->decoded_message->which_content); - - if(handler && handler->message_handler) { - handler->message_handler(rpc->decoded_message, handler->context); - } else if(rpc->decoded_message->which_content == 0) { - /* Receiving zeroes means message is 0-length, which - * is valid for proto3: all fields are filled with default values. - * 0 - is default value for which_content field. - * Mark it as decode error, because there is no content message - * in Main message with tag 0. - */ - message_decode_failed = true; - } else if(!handler && !rpc->session.terminate) { - FURI_LOG_E( - TAG, - "Message(%d) decoded, but not implemented", - rpc->decoded_message->which_content); - rpc_send_and_release_empty( - rpc, rpc->decoded_message->command_id, PB_CommandStatus_ERROR_NOT_IMPLEMENTED); - } - } else { - message_decode_failed = true; - } - - if(message_decode_failed) { - xStreamBufferReset(rpc->stream); - if(!rpc->session.terminate) { - /* Protobuf can't determine start and end of message. - * Handle this by adding varint at beginning - * of a message (PB_ENCODE_DELIMITED). But decoding fail - * means we can't be sure next bytes are varint for next - * message, so the only way to close session. - * RPC itself can't make decision to close session. It has - * to notify: - * 1) down layer (transport) - * 2) other side (companion app) - * Who are responsible to handle RPC session lifecycle. - * Companion receives 2 messages: ERROR_DECODE and session_closed. - */ - FURI_LOG_E(TAG, "Decode failed, error: \'%.128s\'", PB_GET_ERROR(&istream)); - rpc->session.decode_error = true; - rpc_send_and_release_empty(rpc, 0, PB_CommandStatus_ERROR_DECODE); - osMutexAcquire(rpc->session.callbacks_mutex, osWaitForever); - if(rpc->session.closed_callback) { - rpc->session.closed_callback(rpc->session.context); - } - osMutexRelease(rpc->session.callbacks_mutex); - } - } - - pb_release(&PB_Main_msg, rpc->decoded_message); - - if(rpc->session.terminate) { - FURI_LOG_D(TAG, "Session terminated"); - osMutexAcquire(rpc->session.callbacks_mutex, osWaitForever); - if(rpc->session.terminated_callback) { - rpc->session.terminated_callback(rpc->session.context); - } - osMutexRelease(rpc->session.callbacks_mutex); - osEventFlagsClear(rpc->events, RPC_EVENTS_ALL); - rpc_free_session(&rpc->session); - rpc->busy = false; - } - } - return 0; -} - -void rpc_add_handler(Rpc* rpc, pb_size_t message_tag, RpcHandler* handler) { - furi_assert(RpcHandlerDict_get(rpc->handlers, message_tag) == NULL); - - RpcHandlerDict_set_at(rpc->handlers, message_tag, *handler); -} - -void rpc_send_and_release_empty(Rpc* rpc, uint32_t command_id, PB_CommandStatus status) { +void rpc_send_and_release_empty(RpcSession* session, uint32_t command_id, PB_CommandStatus status) { PB_Main message = { .command_id = command_id, .command_status = status, .has_next = false, .which_content = PB_Main_empty_tag, }; - rpc_send_and_release(rpc, &message); + rpc_send_and_release(session, &message); pb_release(&PB_Main_msg, &message); } diff --git a/applications/rpc/rpc_app.c b/applications/rpc/rpc_app.c index 136edfc1..c35decf0 100644 --- a/applications/rpc/rpc_app.c +++ b/applications/rpc/rpc_app.c @@ -5,10 +5,11 @@ #include static void rpc_system_app_start_process(const PB_Main* request, void* context) { - Rpc* rpc = context; - furi_assert(rpc); furi_assert(request); furi_assert(request->which_content == PB_Main_app_start_request_tag); + RpcSession* session = (RpcSession*)context; + furi_assert(session); + PB_CommandStatus result = PB_CommandStatus_ERROR_APP_CANT_START; Loader* loader = furi_record_open("loader"); @@ -33,14 +34,14 @@ static void rpc_system_app_start_process(const PB_Main* request, void* context) furi_record_close("loader"); - rpc_send_and_release_empty(rpc, request->command_id, result); + rpc_send_and_release_empty(session, request->command_id, result); } static void rpc_system_app_lock_status_process(const PB_Main* request, void* context) { - Rpc* rpc = context; - furi_assert(rpc); furi_assert(request); furi_assert(request->which_content == PB_Main_app_lock_status_request_tag); + RpcSession* session = (RpcSession*)context; + furi_assert(session); Loader* loader = furi_record_open("loader"); @@ -55,24 +56,24 @@ static void rpc_system_app_lock_status_process(const PB_Main* request, void* con furi_record_close("loader"); - rpc_send_and_release(rpc, &response); + rpc_send_and_release(session, &response); pb_release(&PB_Main_msg, &response); } -void* rpc_system_app_alloc(Rpc* rpc) { - furi_assert(rpc); +void* rpc_system_app_alloc(RpcSession* session) { + furi_assert(session); RpcHandler rpc_handler = { .message_handler = NULL, .decode_submessage = NULL, - .context = rpc, + .context = session, }; rpc_handler.message_handler = rpc_system_app_start_process; - rpc_add_handler(rpc, PB_Main_app_start_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_app_start_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_app_lock_status_process; - rpc_add_handler(rpc, PB_Main_app_lock_status_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_app_lock_status_request_tag, &rpc_handler); return NULL; } diff --git a/applications/rpc/rpc_cli.c b/applications/rpc/rpc_cli.c index 55cc3e49..36ae9ef3 100644 --- a/applications/rpc/rpc_cli.c +++ b/applications/rpc/rpc_cli.c @@ -31,7 +31,7 @@ void rpc_cli_command_start_session(Cli* cli, string_t args, void* context) { RpcSession* rpc_session = rpc_session_open(rpc); if(rpc_session == NULL) { - printf("Another session is in progress\r\n"); + printf("Session start error\r\n"); return; } diff --git a/applications/rpc/rpc_gui.c b/applications/rpc/rpc_gui.c index 7ee637a0..1e4b0fe2 100644 --- a/applications/rpc/rpc_gui.c +++ b/applications/rpc/rpc_gui.c @@ -6,11 +6,12 @@ #define TAG "RpcGui" typedef struct { - Rpc* rpc; + RpcSession* session; Gui* gui; ViewPort* virtual_display_view_port; uint8_t* virtual_display_buffer; bool virtual_display_not_empty; + bool is_streaming; } RpcGuiSystem; static void @@ -19,7 +20,8 @@ static void furi_assert(size == 1024); furi_assert(context); - RpcGuiSystem* rpc_gui = context; + RpcGuiSystem* rpc_gui = (RpcGuiSystem*)context; + RpcSession* session = rpc_gui->session; PB_Main* frame = malloc(sizeof(PB_Main)); @@ -31,7 +33,7 @@ static void *frame_size_msg = size; memcpy(buffer, data, size); - rpc_send_and_release(rpc_gui->rpc, frame); + rpc_send_and_release(session, frame); free(frame); } @@ -41,20 +43,33 @@ static void rpc_system_gui_start_screen_stream_process(const PB_Main* request, v furi_assert(context); RpcGuiSystem* rpc_gui = context; - rpc_send_and_release_empty(rpc_gui->rpc, request->command_id, PB_CommandStatus_OK); + RpcSession* session = rpc_gui->session; + furi_assert(session); - gui_set_framebuffer_callback( - rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context); + if(gui_get_framebuffer_callback(rpc_gui->gui) == NULL) { + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); + rpc_gui->is_streaming = true; + gui_set_framebuffer_callback( + rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context); + } else { + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_ERROR_BUSY); + } } static void rpc_system_gui_stop_screen_stream_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(context); + RpcGuiSystem* rpc_gui = context; + RpcSession* session = rpc_gui->session; + furi_assert(session); - gui_set_framebuffer_callback(rpc_gui->gui, NULL, NULL); + if(rpc_gui->is_streaming) { + rpc_gui->is_streaming = false; + gui_set_framebuffer_callback(rpc_gui->gui, NULL, NULL); + } - rpc_send_and_release_empty(rpc_gui->rpc, request->command_id, PB_CommandStatus_OK); + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); } static void @@ -62,7 +77,10 @@ static void furi_assert(request); furi_assert(request->which_content == PB_Main_gui_send_input_event_request_tag); furi_assert(context); + RpcGuiSystem* rpc_gui = context; + RpcSession* session = rpc_gui->session; + furi_assert(session); InputEvent event; @@ -117,7 +135,7 @@ static void if(invalid) { rpc_send_and_release_empty( - rpc_gui->rpc, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); + session, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); return; } @@ -125,12 +143,13 @@ static void furi_check(input_events); furi_pubsub_publish(input_events, &event); furi_record_close("input_events"); - rpc_send_and_release_empty(rpc_gui->rpc, request->command_id, PB_CommandStatus_OK); + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); } static void rpc_system_gui_virtual_display_render_callback(Canvas* canvas, void* context) { furi_assert(canvas); furi_assert(context); + RpcGuiSystem* rpc_gui = context; if(!rpc_gui->virtual_display_not_empty) { @@ -146,13 +165,14 @@ static void rpc_system_gui_virtual_display_render_callback(Canvas* canvas, void* static void rpc_system_gui_start_virtual_display_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(context); + RpcGuiSystem* rpc_gui = context; + RpcSession* session = rpc_gui->session; + furi_assert(session); if(rpc_gui->virtual_display_view_port) { rpc_send_and_release_empty( - rpc_gui->rpc, - request->command_id, - PB_CommandStatus_ERROR_VIRTUAL_DISPLAY_ALREADY_STARTED); + session, request->command_id, PB_CommandStatus_ERROR_VIRTUAL_DISPLAY_ALREADY_STARTED); return; } @@ -178,17 +198,20 @@ static void rpc_system_gui_start_virtual_display_process(const PB_Main* request, rpc_gui); gui_add_view_port(rpc_gui->gui, rpc_gui->virtual_display_view_port, GuiLayerFullscreen); - rpc_send_and_release_empty(rpc_gui->rpc, request->command_id, PB_CommandStatus_OK); + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); } static void rpc_system_gui_stop_virtual_display_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(context); + RpcGuiSystem* rpc_gui = context; + RpcSession* session = rpc_gui->session; + furi_assert(session); if(!rpc_gui->virtual_display_view_port) { rpc_send_and_release_empty( - rpc_gui->rpc, request->command_id, PB_CommandStatus_ERROR_VIRTUAL_DISPLAY_NOT_STARTED); + session, request->command_id, PB_CommandStatus_ERROR_VIRTUAL_DISPLAY_NOT_STARTED); return; } @@ -198,13 +221,16 @@ static void rpc_system_gui_stop_virtual_display_process(const PB_Main* request, rpc_gui->virtual_display_view_port = NULL; rpc_gui->virtual_display_not_empty = false; - rpc_send_and_release_empty(rpc_gui->rpc, request->command_id, PB_CommandStatus_OK); + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); } static void rpc_system_gui_virtual_display_frame_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(context); + RpcGuiSystem* rpc_gui = context; + RpcSession* session = rpc_gui->session; + furi_assert(session); if(!rpc_gui->virtual_display_view_port) { FURI_LOG_W(TAG, "Virtual display is not started, ignoring incoming frame packet"); @@ -218,14 +244,16 @@ static void rpc_system_gui_virtual_display_frame_process(const PB_Main* request, buffer_size); rpc_gui->virtual_display_not_empty = true; view_port_update(rpc_gui->virtual_display_view_port); + + (void)session; } -void* rpc_system_gui_alloc(Rpc* rpc) { - furi_assert(rpc); +void* rpc_system_gui_alloc(RpcSession* session) { + furi_assert(session); RpcGuiSystem* rpc_gui = malloc(sizeof(RpcGuiSystem)); rpc_gui->gui = furi_record_open("gui"); - rpc_gui->rpc = rpc; + rpc_gui->session = session; RpcHandler rpc_handler = { .message_handler = NULL, @@ -234,29 +262,29 @@ void* rpc_system_gui_alloc(Rpc* rpc) { }; rpc_handler.message_handler = rpc_system_gui_start_screen_stream_process; - rpc_add_handler(rpc, PB_Main_gui_start_screen_stream_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_gui_start_screen_stream_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_gui_stop_screen_stream_process; - rpc_add_handler(rpc, PB_Main_gui_stop_screen_stream_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_gui_stop_screen_stream_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_gui_send_input_event_request_process; - rpc_add_handler(rpc, PB_Main_gui_send_input_event_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_gui_send_input_event_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_gui_start_virtual_display_process; - rpc_add_handler(rpc, PB_Main_gui_start_virtual_display_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_gui_start_virtual_display_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_gui_stop_virtual_display_process; - rpc_add_handler(rpc, PB_Main_gui_stop_virtual_display_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_gui_stop_virtual_display_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_gui_virtual_display_frame_process; - rpc_add_handler(rpc, PB_Main_gui_screen_frame_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_gui_screen_frame_tag, &rpc_handler); return rpc_gui; } -void rpc_system_gui_free(void* ctx) { - furi_assert(ctx); - RpcGuiSystem* rpc_gui = ctx; +void rpc_system_gui_free(void* context) { + furi_assert(context); + RpcGuiSystem* rpc_gui = context; furi_assert(rpc_gui->gui); if(rpc_gui->virtual_display_view_port) { @@ -267,7 +295,9 @@ void rpc_system_gui_free(void* ctx) { rpc_gui->virtual_display_not_empty = false; } - gui_set_framebuffer_callback(rpc_gui->gui, NULL, NULL); + if(rpc_gui->is_streaming) { + gui_set_framebuffer_callback(rpc_gui->gui, NULL, NULL); + } furi_record_close("gui"); free(rpc_gui); } diff --git a/applications/rpc/rpc_i.h b/applications/rpc/rpc_i.h index 904129f5..a8ffff28 100644 --- a/applications/rpc/rpc_i.h +++ b/applications/rpc/rpc_i.h @@ -7,8 +7,8 @@ #include #include -typedef void* (*RpcSystemAlloc)(Rpc*); -typedef void (*RpcSystemFree)(void*); +typedef void* (*RpcSystemAlloc)(RpcSession* session); +typedef void (*RpcSystemFree)(void* context); typedef void (*PBMessageHandler)(const PB_Main* msg_request, void* context); typedef struct { @@ -17,18 +17,19 @@ typedef struct { void* context; } RpcHandler; -void rpc_send_and_release(Rpc* rpc, PB_Main* main_message); -void rpc_send_and_release_empty(Rpc* rpc, uint32_t command_id, PB_CommandStatus status); -void rpc_add_handler(Rpc* rpc, pb_size_t message_tag, RpcHandler* handler); +void rpc_send_and_release(RpcSession* session, PB_Main* main_message); +void rpc_send_and_release_empty(RpcSession* session, uint32_t command_id, PB_CommandStatus status); -void* rpc_system_system_alloc(Rpc* rpc); -void* rpc_system_storage_alloc(Rpc* rpc); +void rpc_add_handler(RpcSession* session, pb_size_t message_tag, RpcHandler* handler); + +void* rpc_system_system_alloc(RpcSession* session); +void* rpc_system_storage_alloc(RpcSession* session); void rpc_system_storage_free(void* ctx); -void* rpc_system_app_alloc(Rpc* rpc); -void* rpc_system_gui_alloc(Rpc* rpc); +void* rpc_system_app_alloc(RpcSession* session); +void* rpc_system_gui_alloc(RpcSession* session); void rpc_system_gui_free(void* ctx); void rpc_print_message(const PB_Main* message); void rpc_cli_command_start_session(Cli* cli, string_t args, void* context); -PB_CommandStatus rpc_system_storage_get_error(FS_Error fs_error); \ No newline at end of file +PB_CommandStatus rpc_system_storage_get_error(FS_Error fs_error); diff --git a/applications/rpc/rpc_storage.c b/applications/rpc/rpc_storage.c index ed43e48d..b70ec4cc 100644 --- a/applications/rpc/rpc_storage.c +++ b/applications/rpc/rpc_storage.c @@ -21,7 +21,7 @@ typedef enum { } RpcStorageState; typedef struct { - Rpc* rpc; + RpcSession* session; Storage* api; File* file; RpcStorageState state; @@ -30,13 +30,16 @@ typedef struct { void rpc_print_message(const PB_Main* message); -static void rpc_system_storage_reset_state(RpcStorageSystem* rpc_storage, bool send_error) { +static void rpc_system_storage_reset_state( + RpcStorageSystem* rpc_storage, + RpcSession* session, + bool send_error) { furi_assert(rpc_storage); if(rpc_storage->state != RpcStorageStateIdle) { if(send_error) { rpc_send_and_release_empty( - rpc_storage->rpc, + session, rpc_storage->current_command_id, PB_CommandStatus_ERROR_CONTINUOUS_COMMAND_INTERRUPTED); } @@ -102,7 +105,10 @@ static void rpc_system_storage_info_process(const PB_Main* request, void* contex furi_assert(request->which_content == PB_Main_storage_info_request_tag); RpcStorageSystem* rpc_storage = context; - rpc_system_storage_reset_state(rpc_storage, true); + RpcSession* session = rpc_storage->session; + furi_assert(session); + + rpc_system_storage_reset_state(rpc_storage, session, true); PB_Main* response = malloc(sizeof(PB_Main)); response->command_id = request->command_id; @@ -122,7 +128,7 @@ static void rpc_system_storage_info_process(const PB_Main* request, void* contex response->which_content = PB_Main_empty_tag; } - rpc_send_and_release(rpc_storage->rpc, response); + rpc_send_and_release(session, response); free(response); furi_record_close("storage"); } @@ -133,7 +139,10 @@ static void rpc_system_storage_stat_process(const PB_Main* request, void* contex furi_assert(request->which_content == PB_Main_storage_stat_request_tag); RpcStorageSystem* rpc_storage = context; - rpc_system_storage_reset_state(rpc_storage, true); + RpcSession* session = rpc_storage->session; + furi_assert(session); + + rpc_system_storage_reset_state(rpc_storage, session, true); PB_Main* response = malloc(sizeof(PB_Main)); response->command_id = request->command_id; @@ -156,13 +165,16 @@ static void rpc_system_storage_stat_process(const PB_Main* request, void* contex response->content.storage_stat_response.file.size = fileinfo.size; } - rpc_send_and_release(rpc_storage->rpc, response); + rpc_send_and_release(session, response); free(response); furi_record_close("storage"); } static void rpc_system_storage_list_root(const PB_Main* request, void* context) { RpcStorageSystem* rpc_storage = context; + RpcSession* session = rpc_storage->session; + furi_assert(session); + const char* hard_coded_dirs[] = {"any", "int", "ext"}; PB_Main response = { @@ -183,7 +195,7 @@ static void rpc_system_storage_list_root(const PB_Main* request, void* context) response.content.storage_list_response.file[i].name = str; } - rpc_send_and_release(rpc_storage->rpc, &response); + rpc_send_and_release(session, &response); } static void rpc_system_storage_list_process(const PB_Main* request, void* context) { @@ -192,7 +204,10 @@ static void rpc_system_storage_list_process(const PB_Main* request, void* contex furi_assert(request->which_content == PB_Main_storage_list_request_tag); RpcStorageSystem* rpc_storage = context; - rpc_system_storage_reset_state(rpc_storage, true); + RpcSession* session = rpc_storage->session; + furi_assert(session); + + rpc_system_storage_reset_state(rpc_storage, session, true); if(!strcmp(request->content.storage_list_request.path, "/")) { rpc_system_storage_list_root(request, context); @@ -226,7 +241,7 @@ static void rpc_system_storage_list_process(const PB_Main* request, void* contex if(i == COUNT_OF(list->file)) { list->file_count = i; response.has_next = true; - rpc_send_and_release(rpc_storage->rpc, &response); + rpc_send_and_release(session, &response); i = 0; } list->file[i].type = (fileinfo.flags & FSF_DIRECTORY) ? PB_Storage_File_FileType_DIR : @@ -243,7 +258,7 @@ static void rpc_system_storage_list_process(const PB_Main* request, void* contex } response.has_next = false; - rpc_send_and_release(rpc_storage->rpc, &response); + rpc_send_and_release(session, &response); storage_dir_close(dir); storage_file_free(dir); @@ -253,10 +268,14 @@ static void rpc_system_storage_list_process(const PB_Main* request, void* contex static void rpc_system_storage_read_process(const PB_Main* request, void* context) { furi_assert(request); + furi_assert(context); furi_assert(request->which_content == PB_Main_storage_read_request_tag); RpcStorageSystem* rpc_storage = context; - rpc_system_storage_reset_state(rpc_storage, true); + RpcSession* session = rpc_storage->session; + furi_assert(session); + + rpc_system_storage_reset_state(rpc_storage, session, true); /* use same message memory to send reponse */ PB_Main* response = malloc(sizeof(PB_Main)); @@ -284,17 +303,17 @@ static void rpc_system_storage_read_process(const PB_Main* request, void* contex if(result) { response->has_next = (size_left > 0); - rpc_send_and_release(rpc_storage->rpc, response); + rpc_send_and_release(session, response); } } while((size_left != 0) && result); if(!result) { rpc_send_and_release_empty( - rpc_storage->rpc, request->command_id, rpc_system_storage_get_file_error(file)); + session, request->command_id, rpc_system_storage_get_file_error(file)); } } else { rpc_send_and_release_empty( - rpc_storage->rpc, request->command_id, rpc_system_storage_get_file_error(file)); + session, request->command_id, rpc_system_storage_get_file_error(file)); } free(response); @@ -306,14 +325,18 @@ static void rpc_system_storage_read_process(const PB_Main* request, void* contex static void rpc_system_storage_write_process(const PB_Main* request, void* context) { furi_assert(request); + furi_assert(context); furi_assert(request->which_content == PB_Main_storage_write_request_tag); RpcStorageSystem* rpc_storage = context; + RpcSession* session = rpc_storage->session; + furi_assert(session); + bool result = true; if((request->command_id != rpc_storage->current_command_id) && (rpc_storage->state == RpcStorageStateWriting)) { - rpc_system_storage_reset_state(rpc_storage, true); + rpc_system_storage_reset_state(rpc_storage, session, true); } if(rpc_storage->state != RpcStorageStateWriting) { @@ -336,17 +359,15 @@ static void rpc_system_storage_write_process(const PB_Main* request, void* conte if(result && !request->has_next) { rpc_send_and_release_empty( - rpc_storage->rpc, rpc_storage->current_command_id, PB_CommandStatus_OK); - rpc_system_storage_reset_state(rpc_storage, false); + session, rpc_storage->current_command_id, PB_CommandStatus_OK); + rpc_system_storage_reset_state(rpc_storage, session, false); } } if(!result) { rpc_send_and_release_empty( - rpc_storage->rpc, - rpc_storage->current_command_id, - rpc_system_storage_get_file_error(file)); - rpc_system_storage_reset_state(rpc_storage, false); + session, rpc_storage->current_command_id, rpc_system_storage_get_file_error(file)); + rpc_system_storage_reset_state(rpc_storage, session, false); } } @@ -373,8 +394,11 @@ static void rpc_system_storage_delete_process(const PB_Main* request, void* cont furi_assert(request->which_content == PB_Main_storage_delete_request_tag); furi_assert(context); RpcStorageSystem* rpc_storage = context; + RpcSession* session = rpc_storage->session; + furi_assert(session); + PB_CommandStatus status = PB_CommandStatus_ERROR; - rpc_system_storage_reset_state(rpc_storage, true); + rpc_system_storage_reset_state(rpc_storage, session, true); Storage* fs_api = furi_record_open("storage"); @@ -400,7 +424,7 @@ static void rpc_system_storage_delete_process(const PB_Main* request, void* cont } furi_record_close("storage"); - rpc_send_and_release_empty(rpc_storage->rpc, request->command_id, status); + rpc_send_and_release_empty(session, request->command_id, status); } static void rpc_system_storage_mkdir_process(const PB_Main* request, void* context) { @@ -408,8 +432,11 @@ static void rpc_system_storage_mkdir_process(const PB_Main* request, void* conte furi_assert(request->which_content == PB_Main_storage_mkdir_request_tag); furi_assert(context); RpcStorageSystem* rpc_storage = context; + RpcSession* session = rpc_storage->session; + furi_assert(session); + PB_CommandStatus status; - rpc_system_storage_reset_state(rpc_storage, true); + rpc_system_storage_reset_state(rpc_storage, session, true); Storage* fs_api = furi_record_open("storage"); char* path = request->content.storage_mkdir_request.path; @@ -420,7 +447,7 @@ static void rpc_system_storage_mkdir_process(const PB_Main* request, void* conte status = PB_CommandStatus_ERROR_INVALID_PARAMETERS; } furi_record_close("storage"); - rpc_send_and_release_empty(rpc_storage->rpc, request->command_id, status); + rpc_send_and_release_empty(session, request->command_id, status); } static void rpc_system_storage_md5sum_process(const PB_Main* request, void* context) { @@ -428,12 +455,15 @@ static void rpc_system_storage_md5sum_process(const PB_Main* request, void* cont furi_assert(request->which_content == PB_Main_storage_md5sum_request_tag); furi_assert(context); RpcStorageSystem* rpc_storage = context; - rpc_system_storage_reset_state(rpc_storage, true); + RpcSession* session = rpc_storage->session; + furi_assert(session); + + rpc_system_storage_reset_state(rpc_storage, session, true); const char* filename = request->content.storage_md5sum_request.path; if(!filename) { rpc_send_and_release_empty( - rpc_storage->rpc, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); + session, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); return; } @@ -474,10 +504,10 @@ static void rpc_system_storage_md5sum_process(const PB_Main* request, void* cont free(hash); free(data); storage_file_close(file); - rpc_send_and_release(rpc_storage->rpc, &response); + rpc_send_and_release(session, &response); } else { rpc_send_and_release_empty( - rpc_storage->rpc, request->command_id, rpc_system_storage_get_file_error(file)); + session, request->command_id, rpc_system_storage_get_file_error(file)); } storage_file_free(file); @@ -490,8 +520,11 @@ static void rpc_system_storage_rename_process(const PB_Main* request, void* cont furi_assert(request->which_content == PB_Main_storage_rename_request_tag); furi_assert(context); RpcStorageSystem* rpc_storage = context; + RpcSession* session = rpc_storage->session; + furi_assert(session); + PB_CommandStatus status; - rpc_system_storage_reset_state(rpc_storage, true); + rpc_system_storage_reset_state(rpc_storage, session, true); Storage* fs_api = furi_record_open("storage"); @@ -502,15 +535,15 @@ static void rpc_system_storage_rename_process(const PB_Main* request, void* cont status = rpc_system_storage_get_error(error); furi_record_close("storage"); - rpc_send_and_release_empty(rpc_storage->rpc, request->command_id, status); + rpc_send_and_release_empty(session, request->command_id, status); } -void* rpc_system_storage_alloc(Rpc* rpc) { - furi_assert(rpc); +void* rpc_system_storage_alloc(RpcSession* session) { + furi_assert(session); RpcStorageSystem* rpc_storage = malloc(sizeof(RpcStorageSystem)); rpc_storage->api = furi_record_open("storage"); - rpc_storage->rpc = rpc; + rpc_storage->session = session; rpc_storage->state = RpcStorageStateIdle; RpcHandler rpc_handler = { @@ -520,37 +553,40 @@ void* rpc_system_storage_alloc(Rpc* rpc) { }; rpc_handler.message_handler = rpc_system_storage_info_process; - rpc_add_handler(rpc, PB_Main_storage_info_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_info_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_storage_stat_process; - rpc_add_handler(rpc, PB_Main_storage_stat_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_stat_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_storage_list_process; - rpc_add_handler(rpc, PB_Main_storage_list_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_list_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_storage_read_process; - rpc_add_handler(rpc, PB_Main_storage_read_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_read_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_storage_write_process; - rpc_add_handler(rpc, PB_Main_storage_write_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_write_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_storage_delete_process; - rpc_add_handler(rpc, PB_Main_storage_delete_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_delete_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_storage_mkdir_process; - rpc_add_handler(rpc, PB_Main_storage_mkdir_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_mkdir_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_storage_md5sum_process; - rpc_add_handler(rpc, PB_Main_storage_md5sum_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_md5sum_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_storage_rename_process; - rpc_add_handler(rpc, PB_Main_storage_rename_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_storage_rename_request_tag, &rpc_handler); return rpc_storage; } -void rpc_system_storage_free(void* ctx) { - RpcStorageSystem* rpc_storage = ctx; - rpc_system_storage_reset_state(rpc_storage, false); +void rpc_system_storage_free(void* context) { + RpcStorageSystem* rpc_storage = context; + RpcSession* session = rpc_storage->session; + furi_assert(session); + + rpc_system_storage_reset_state(rpc_storage, session, false); free(rpc_storage); } diff --git a/applications/rpc/rpc_system.c b/applications/rpc/rpc_system.c index e718ad8a..9884e30a 100644 --- a/applications/rpc/rpc_system.c +++ b/applications/rpc/rpc_system.c @@ -6,40 +6,42 @@ #include "rpc_i.h" -static void rpc_system_system_ping_process(const PB_Main* msg_request, void* context) { - furi_assert(msg_request); - furi_assert(msg_request->which_content == PB_Main_system_ping_request_tag); - furi_assert(context); - Rpc* rpc = context; +static void rpc_system_system_ping_process(const PB_Main* request, void* context) { + furi_assert(request); + furi_assert(request->which_content == PB_Main_system_ping_request_tag); - if(msg_request->has_next) { + RpcSession* session = (RpcSession*)context; + furi_assert(session); + + if(request->has_next) { rpc_send_and_release_empty( - rpc, msg_request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); + session, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); return; } - PB_Main msg_response = PB_Main_init_default; - msg_response.has_next = false; - msg_response.command_status = PB_CommandStatus_OK; - msg_response.command_id = msg_request->command_id; - msg_response.which_content = PB_Main_system_ping_response_tag; + PB_Main response = PB_Main_init_default; + response.has_next = false; + response.command_status = PB_CommandStatus_OK; + response.command_id = request->command_id; + response.which_content = PB_Main_system_ping_response_tag; - const PB_System_PingRequest* request = &msg_request->content.system_ping_request; - PB_System_PingResponse* response = &msg_response.content.system_ping_response; - if(request->data && (request->data->size > 0)) { - response->data = malloc(PB_BYTES_ARRAY_T_ALLOCSIZE(request->data->size)); - memcpy(response->data->bytes, request->data->bytes, request->data->size); - response->data->size = request->data->size; + const PB_System_PingRequest* ping_request = &request->content.system_ping_request; + PB_System_PingResponse* ping_response = &response.content.system_ping_response; + if(ping_request->data && (ping_request->data->size > 0)) { + ping_response->data = malloc(PB_BYTES_ARRAY_T_ALLOCSIZE(ping_request->data->size)); + memcpy(ping_response->data->bytes, ping_request->data->bytes, ping_request->data->size); + ping_response->data->size = ping_request->data->size; } - rpc_send_and_release(rpc, &msg_response); + rpc_send_and_release(session, &response); } static void rpc_system_system_reboot_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(request->which_content == PB_Main_system_reboot_request_tag); - furi_assert(context); - Rpc* rpc = context; + + RpcSession* session = (RpcSession*)context; + furi_assert(session); const int mode = request->content.system_reboot_request.mode; @@ -49,12 +51,12 @@ static void rpc_system_system_reboot_process(const PB_Main* request, void* conte power_reboot(PowerBootModeDfu); } else { rpc_send_and_release_empty( - rpc, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); + session, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); } } typedef struct { - Rpc* rpc; + RpcSession* session; PB_Main* response; } RpcSystemSystemDeviceInfoContext; @@ -65,7 +67,6 @@ static void rpc_system_system_device_info_callback( void* context) { furi_assert(key); furi_assert(value); - furi_assert(context); RpcSystemSystemDeviceInfoContext* ctx = context; char* str_key = strdup(key); @@ -75,14 +76,15 @@ static void rpc_system_system_device_info_callback( ctx->response->content.system_device_info_response.key = str_key; ctx->response->content.system_device_info_response.value = str_value; - rpc_send_and_release(ctx->rpc, ctx->response); + rpc_send_and_release(ctx->session, ctx->response); } static void rpc_system_system_device_info_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(request->which_content == PB_Main_system_device_info_request_tag); - furi_assert(context); - Rpc* rpc = context; + + RpcSession* session = (RpcSession*)context; + furi_assert(session); PB_Main* response = malloc(sizeof(PB_Main)); response->command_id = request->command_id; @@ -90,10 +92,9 @@ static void rpc_system_system_device_info_process(const PB_Main* request, void* response->command_status = PB_CommandStatus_OK; RpcSystemSystemDeviceInfoContext device_info_context = { - .rpc = rpc, + .session = session, .response = response, }; - furi_hal_info_get(rpc_system_system_device_info_callback, &device_info_context); free(response); @@ -102,8 +103,9 @@ static void rpc_system_system_device_info_process(const PB_Main* request, void* static void rpc_system_system_get_datetime_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(request->which_content == PB_Main_system_get_datetime_request_tag); - furi_assert(context); - Rpc* rpc = context; + + RpcSession* session = (RpcSession*)context; + furi_assert(session); FuriHalRtcDateTime datetime; furi_hal_rtc_get_datetime(&datetime); @@ -121,19 +123,20 @@ static void rpc_system_system_get_datetime_process(const PB_Main* request, void* response->content.system_get_datetime_response.datetime.year = datetime.year; response->content.system_get_datetime_response.datetime.weekday = datetime.weekday; - rpc_send_and_release(rpc, response); + rpc_send_and_release(session, response); free(response); } static void rpc_system_system_set_datetime_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(request->which_content == PB_Main_system_set_datetime_request_tag); - furi_assert(context); - Rpc* rpc = context; + + RpcSession* session = (RpcSession*)context; + furi_assert(session); if(!request->content.system_set_datetime_request.has_datetime) { rpc_send_and_release_empty( - rpc, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); + session, request->command_id, PB_CommandStatus_ERROR_INVALID_PARAMETERS); return; } @@ -147,38 +150,43 @@ static void rpc_system_system_set_datetime_process(const PB_Main* request, void* datetime.weekday = request->content.system_set_datetime_request.datetime.weekday; furi_hal_rtc_set_datetime(&datetime); - rpc_send_and_release_empty(rpc, request->command_id, PB_CommandStatus_OK); + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); } static void rpc_system_system_factory_reset_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(request->which_content == PB_Main_system_factory_reset_request_tag); - furi_assert(context); + + RpcSession* session = (RpcSession*)context; + furi_assert(session); furi_hal_rtc_set_flag(FuriHalRtcFlagFactoryReset); power_reboot(PowerBootModeNormal); + + (void)session; } static void rpc_system_system_play_audiovisual_alert_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(request->which_content == PB_Main_system_play_audiovisual_alert_request_tag); - furi_assert(context); - Rpc* rpc = context; + + RpcSession* session = (RpcSession*)context; + furi_assert(session); NotificationApp* notification = furi_record_open("notification"); notification_message(notification, &sequence_audiovisual_alert); furi_record_close("notification"); - rpc_send_and_release_empty(rpc, request->command_id, PB_CommandStatus_OK); + rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK); } static void rpc_system_system_protobuf_version_process(const PB_Main* request, void* context) { furi_assert(request); furi_assert(request->which_content == PB_Main_system_protobuf_version_request_tag); - furi_assert(context); - Rpc* rpc = context; + RpcSession* session = (RpcSession*)context; + furi_assert(session); PB_Main* response = malloc(sizeof(PB_Main)); response->command_id = request->command_id; @@ -190,40 +198,40 @@ static void rpc_system_system_protobuf_version_process(const PB_Main* request, v response->content.system_protobuf_version_response.major = PROTOBUF_MAJOR_VERSION; response->content.system_protobuf_version_response.minor = PROTOBUF_MINOR_VERSION; - rpc_send_and_release(rpc, response); + rpc_send_and_release(session, response); free(response); } -void* rpc_system_system_alloc(Rpc* rpc) { +void* rpc_system_system_alloc(RpcSession* session) { RpcHandler rpc_handler = { .message_handler = NULL, .decode_submessage = NULL, - .context = rpc, + .context = session, }; rpc_handler.message_handler = rpc_system_system_ping_process; - rpc_add_handler(rpc, PB_Main_system_ping_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_system_ping_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_system_reboot_process; - rpc_add_handler(rpc, PB_Main_system_reboot_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_system_reboot_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_system_device_info_process; - rpc_add_handler(rpc, PB_Main_system_device_info_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_system_device_info_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_system_factory_reset_process; - rpc_add_handler(rpc, PB_Main_system_factory_reset_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_system_factory_reset_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_system_get_datetime_process; - rpc_add_handler(rpc, PB_Main_system_get_datetime_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_system_get_datetime_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_system_set_datetime_process; - rpc_add_handler(rpc, PB_Main_system_set_datetime_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_system_set_datetime_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_system_play_audiovisual_alert_process; - rpc_add_handler(rpc, PB_Main_system_play_audiovisual_alert_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_system_play_audiovisual_alert_request_tag, &rpc_handler); rpc_handler.message_handler = rpc_system_system_protobuf_version_process; - rpc_add_handler(rpc, PB_Main_system_protobuf_version_request_tag, &rpc_handler); + rpc_add_handler(session, PB_Main_system_protobuf_version_request_tag, &rpc_handler); return NULL; } diff --git a/applications/tests/rpc/rpc_test.c b/applications/tests/rpc/rpc_test.c index e80d36cb..cdb63053 100644 --- a/applications/tests/rpc/rpc_test.c +++ b/applications/tests/rpc/rpc_test.c @@ -24,21 +24,23 @@ LIST_DEF(MsgList, PB_Main, M_POD_OPLIST) #define M_OPL_MsgList_t() LIST_OPLIST(MsgList) +#define TEST_RPC_SESSIONS 2 + /* MinUnit test framework doesn't allow passing context into tests, * so we have to use global variables */ static Rpc* rpc = NULL; -static RpcSession* session = NULL; static uint32_t command_id = 0; typedef struct { + RpcSession* session; StreamBufferHandle_t output_stream; SemaphoreHandle_t close_session_semaphore; SemaphoreHandle_t terminate_semaphore; TickType_t timeout; } RpcSessionContext; -static RpcSessionContext rpc_session_context; +static RpcSessionContext rpc_session[TEST_RPC_SESSIONS]; #define TAG "UnitTestsRpc" #define MAX_RECEIVE_OUTPUT_TIMEOUT 3000 @@ -69,48 +71,83 @@ static void output_bytes_callback(void* ctx, uint8_t* got_bytes, size_t got_size static void clean_directory(Storage* fs_api, const char* clean_dir); static void test_rpc_add_empty_to_list(MsgList_t msg_list, PB_CommandStatus status, uint32_t command_id); -static void test_rpc_encode_and_feed(MsgList_t msg_list); -static void test_rpc_encode_and_feed_one(PB_Main* request); +static void test_rpc_encode_and_feed(MsgList_t msg_list, uint8_t session); +static void test_rpc_encode_and_feed_one(PB_Main* request, uint8_t session); static void test_rpc_compare_messages(PB_Main* result, PB_Main* expected); -static void test_rpc_decode_and_compare(MsgList_t expected_msg_list); +static void test_rpc_decode_and_compare(MsgList_t expected_msg_list, uint8_t session); static void test_rpc_free_msg_list(MsgList_t msg_list); static void test_rpc_session_close_callback(void* context); static void test_rpc_session_terminated_callback(void* context); static void test_rpc_setup(void) { furi_check(!rpc); - furi_check(!session); + furi_check(!(rpc_session[0].session)); rpc = furi_record_open("rpc"); - for(int i = 0; !session && (i < 10000); ++i) { - session = rpc_session_open(rpc); + for(int i = 0; !(rpc_session[0].session) && (i < 10000); ++i) { + rpc_session[0].session = rpc_session_open(rpc); delay(1); } - furi_check(session); + furi_check(rpc_session[0].session); - rpc_session_context.output_stream = xStreamBufferCreate(1000, 1); - rpc_session_set_send_bytes_callback(session, output_bytes_callback); - rpc_session_context.close_session_semaphore = xSemaphoreCreateBinary(); - rpc_session_context.terminate_semaphore = xSemaphoreCreateBinary(); - rpc_session_set_close_callback(session, test_rpc_session_close_callback); - rpc_session_set_terminated_callback(session, test_rpc_session_terminated_callback); - rpc_session_set_context(session, &rpc_session_context); + rpc_session[0].output_stream = xStreamBufferCreate(1000, 1); + rpc_session_set_send_bytes_callback(rpc_session[0].session, output_bytes_callback); + rpc_session[0].close_session_semaphore = xSemaphoreCreateBinary(); + rpc_session[0].terminate_semaphore = xSemaphoreCreateBinary(); + rpc_session_set_close_callback(rpc_session[0].session, test_rpc_session_close_callback); + rpc_session_set_terminated_callback( + rpc_session[0].session, test_rpc_session_terminated_callback); + rpc_session_set_context(rpc_session[0].session, &rpc_session[0]); +} + +static void test_rpc_setup_second_session(void) { + furi_check(rpc); + furi_check(!(rpc_session[1].session)); + + for(int i = 0; !(rpc_session[1].session) && (i < 10000); ++i) { + rpc_session[1].session = rpc_session_open(rpc); + delay(1); + } + furi_check(rpc_session[1].session); + + rpc_session[1].output_stream = xStreamBufferCreate(1000, 1); + rpc_session_set_send_bytes_callback(rpc_session[1].session, output_bytes_callback); + rpc_session[1].close_session_semaphore = xSemaphoreCreateBinary(); + rpc_session[1].terminate_semaphore = xSemaphoreCreateBinary(); + rpc_session_set_close_callback(rpc_session[1].session, test_rpc_session_close_callback); + rpc_session_set_terminated_callback( + rpc_session[1].session, test_rpc_session_terminated_callback); + rpc_session_set_context(rpc_session[1].session, &rpc_session[1]); } static void test_rpc_teardown(void) { - furi_check(rpc_session_context.close_session_semaphore); - xSemaphoreTake(rpc_session_context.terminate_semaphore, 0); - rpc_session_close(session); - furi_check(xSemaphoreTake(rpc_session_context.terminate_semaphore, portMAX_DELAY)); + furi_check(rpc_session[0].close_session_semaphore); + xSemaphoreTake(rpc_session[0].terminate_semaphore, 0); + rpc_session_close(rpc_session[0].session); + furi_check(xSemaphoreTake(rpc_session[0].terminate_semaphore, portMAX_DELAY)); furi_record_close("rpc"); - vStreamBufferDelete(rpc_session_context.output_stream); - vSemaphoreDelete(rpc_session_context.close_session_semaphore); - vSemaphoreDelete(rpc_session_context.terminate_semaphore); + vStreamBufferDelete(rpc_session[0].output_stream); + vSemaphoreDelete(rpc_session[0].close_session_semaphore); + vSemaphoreDelete(rpc_session[0].terminate_semaphore); ++command_id; - rpc_session_context.output_stream = NULL; - rpc_session_context.close_session_semaphore = NULL; + rpc_session[0].output_stream = NULL; + rpc_session[0].close_session_semaphore = NULL; rpc = NULL; - session = NULL; + rpc_session[0].session = NULL; +} + +static void test_rpc_teardown_second_session(void) { + furi_check(rpc_session[1].close_session_semaphore); + xSemaphoreTake(rpc_session[1].terminate_semaphore, 0); + rpc_session_close(rpc_session[1].session); + furi_check(xSemaphoreTake(rpc_session[1].terminate_semaphore, portMAX_DELAY)); + vStreamBufferDelete(rpc_session[1].output_stream); + vSemaphoreDelete(rpc_session[1].close_session_semaphore); + vSemaphoreDelete(rpc_session[1].terminate_semaphore); + ++command_id; + rpc_session[1].output_stream = NULL; + rpc_session[1].close_session_semaphore = NULL; + rpc_session[1].session = NULL; } static void test_rpc_storage_setup(void) { @@ -334,8 +371,9 @@ static void test_rpc_add_read_or_write_to_list( } while(pattern_repeats); } -static void test_rpc_encode_and_feed_one(PB_Main* request) { +static void test_rpc_encode_and_feed_one(PB_Main* request, uint8_t session) { furi_check(request); + furi_check(session < TEST_RPC_SESSIONS); pb_ostream_t ostream = PB_OSTREAM_SIZING; @@ -350,7 +388,8 @@ static void test_rpc_encode_and_feed_one(PB_Main* request) { size_t bytes_left = ostream.bytes_written; uint8_t* buffer_ptr = buffer; do { - size_t bytes_sent = rpc_session_feed(session, buffer_ptr, bytes_left, 1000); + size_t bytes_sent = + rpc_session_feed(rpc_session[session].session, buffer_ptr, bytes_left, 1000); mu_check(bytes_sent > 0); bytes_left -= bytes_sent; @@ -361,11 +400,11 @@ static void test_rpc_encode_and_feed_one(PB_Main* request) { pb_release(&PB_Main_msg, request); } -static void test_rpc_encode_and_feed(MsgList_t msg_list) { +static void test_rpc_encode_and_feed(MsgList_t msg_list, uint8_t session) { MsgList_reverse(msg_list); for M_EACH(request, msg_list, MsgList_t) { - test_rpc_encode_and_feed_one(request); + test_rpc_encode_and_feed_one(request, session); } MsgList_reverse(msg_list); } @@ -585,13 +624,14 @@ static void test_rpc_storage_list_create_expected_list( furi_record_close("storage"); } -static void test_rpc_decode_and_compare(MsgList_t expected_msg_list) { +static void test_rpc_decode_and_compare(MsgList_t expected_msg_list, uint8_t session) { furi_check(!MsgList_empty_p(expected_msg_list)); + furi_check(session < TEST_RPC_SESSIONS); - rpc_session_context.timeout = xTaskGetTickCount() + MAX_RECEIVE_OUTPUT_TIMEOUT; + rpc_session[session].timeout = xTaskGetTickCount() + MAX_RECEIVE_OUTPUT_TIMEOUT; pb_istream_t istream = { .callback = test_rpc_pb_stream_read, - .state = &rpc_session_context, + .state = &rpc_session[session], .errmsg = NULL, .bytes_left = 0x7FFFFFFF, }; @@ -612,7 +652,7 @@ static void test_rpc_decode_and_compare(MsgList_t expected_msg_list) { pb_release(&PB_Main_msg, &result); } - rpc_session_context.timeout = xTaskGetTickCount() + 50; + rpc_session[session].timeout = xTaskGetTickCount() + 50; if(pb_decode_ex(&istream, &PB_Main_msg, &result, PB_DECODE_DELIMITED)) { mu_fail("decoded more than expected"); } @@ -638,8 +678,8 @@ static void test_rpc_storage_list_run(const char* path, uint32_t command_id) { } else { test_rpc_storage_list_create_expected_list(expected_msg_list, path, command_id); } - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -723,8 +763,8 @@ static void test_storage_read_run(const char* path, uint32_t command_id) { test_rpc_add_read_to_list_by_reading_real_file(expected_msg_list, path, command_id); test_rpc_create_simple_message(&request, PB_Main_storage_read_request_tag, path, command_id); - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -796,8 +836,8 @@ static void test_rpc_storage_info_run(const char* path, uint32_t command_id) { response->which_content = PB_Main_empty_tag; } - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -830,8 +870,8 @@ static void test_rpc_storage_stat_run(const char* path, uint32_t command_id) { response->content.storage_stat_response.file.size = fileinfo.size; } - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -895,8 +935,8 @@ static void test_storage_write_run( test_rpc_add_read_or_write_to_list( input_msg_list, WRITE_REQUEST, path, buf, write_size, write_count, command_id); test_rpc_add_empty_to_list(expected_msg_list, status, command_id); - test_rpc_encode_and_feed(input_msg_list); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed(input_msg_list, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); test_rpc_free_msg_list(input_msg_list); test_rpc_free_msg_list(expected_msg_list); @@ -933,8 +973,8 @@ static void test_storage_write_read_run( test_rpc_print_message_list(input_msg_list); test_rpc_print_message_list(expected_msg_list); - test_rpc_encode_and_feed(input_msg_list); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed(input_msg_list, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); test_rpc_free_msg_list(input_msg_list); test_rpc_free_msg_list(expected_msg_list); @@ -1007,8 +1047,8 @@ MU_TEST(test_storage_interrupt_continuous_same_system) { expected_msg_list, PB_CommandStatus_ERROR_CONTINUOUS_COMMAND_INTERRUPTED, command_id); test_rpc_add_empty_to_list(expected_msg_list, PB_CommandStatus_OK, command_id + 1); - test_rpc_encode_and_feed(input_msg_list); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed(input_msg_list, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); test_rpc_free_msg_list(input_msg_list); test_rpc_free_msg_list(expected_msg_list); @@ -1057,8 +1097,8 @@ MU_TEST(test_storage_interrupt_continuous_another_system) { test_rpc_add_empty_to_list(expected_msg_list, PB_CommandStatus_OK, command_id); test_rpc_add_empty_to_list(expected_msg_list, PB_CommandStatus_OK, command_id + 2); - test_rpc_encode_and_feed(input_msg_list); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed(input_msg_list, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); test_rpc_free_msg_list(input_msg_list); test_rpc_free_msg_list(expected_msg_list); @@ -1077,8 +1117,8 @@ static void test_storage_delete_run( request.content.storage_delete_request.recursive = recursive; test_rpc_add_empty_to_list(expected_msg_list, status, command_id); - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -1157,8 +1197,8 @@ static void test_storage_mkdir_run(const char* path, size_t command_id, PB_Comma test_rpc_create_simple_message(&request, PB_Main_storage_mkdir_request_tag, path, command_id); test_rpc_add_empty_to_list(expected_msg_list, status, command_id); - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -1233,8 +1273,8 @@ static void test_storage_md5sum_run( test_rpc_add_empty_to_list(expected_msg_list, status, command_id); } - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -1292,8 +1332,8 @@ static void test_rpc_storage_rename_run( test_rpc_add_empty_to_list(expected_msg_list, status, command_id); - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -1340,8 +1380,8 @@ MU_TEST(test_ping) { test_rpc_add_ping_to_list(expected_msg_list, PING_RESPONSE, 700); test_rpc_add_ping_to_list(expected_msg_list, PING_RESPONSE, 1); - test_rpc_encode_and_feed(input_msg_list); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed(input_msg_list, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); test_rpc_free_msg_list(input_msg_list); test_rpc_free_msg_list(expected_msg_list); @@ -1367,8 +1407,8 @@ MU_TEST(test_system_protobuf_version) { response->content.system_protobuf_version_response.major = PROTOBUF_MAJOR_VERSION; response->content.system_protobuf_version_response.minor = PROTOBUF_MINOR_VERSION; - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); test_rpc_free_msg_list(expected_msg_list); } @@ -1394,7 +1434,7 @@ MU_TEST_SUITE(test_rpc_storage) { MU_RUN_TEST(test_storage_mkdir); MU_RUN_TEST(test_storage_md5sum); MU_RUN_TEST(test_storage_rename); - // TODO: repair test + DISABLE_TEST(MU_RUN_TEST(test_storage_interrupt_continuous_same_system);); MU_RUN_TEST(test_storage_interrupt_continuous_another_system); } @@ -1437,8 +1477,8 @@ static void test_app_start_run( test_app_create_request(&request, app_name, app_args, command_id); test_rpc_add_empty_to_list(expected_msg_list, status, command_id); - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -1461,8 +1501,8 @@ static void test_app_get_status_lock_run(bool locked_expected, uint32_t command_ response->has_next = false; response->content.app_lock_status_response.locked = locked_expected; - test_rpc_encode_and_feed_one(&request); - test_rpc_decode_and_compare(expected_msg_list); + test_rpc_encode_and_feed_one(&request, 0); + test_rpc_decode_and_compare(expected_msg_list, 0); pb_release(&PB_Main_msg, &request); test_rpc_free_msg_list(expected_msg_list); @@ -1516,7 +1556,7 @@ static void buf[i] = pattern[i % pattern_size]; } - size_t bytes_sent = rpc_session_feed(session, buf, size, 1000); + size_t bytes_sent = rpc_session_feed(rpc_session[0].session, buf, size, 1000); furi_check(bytes_sent == size); free(buf); } @@ -1532,12 +1572,12 @@ static void test_rpc_feed_rubbish_run( test_rpc_add_empty_to_list(expected, PB_CommandStatus_ERROR_DECODE, 0); - furi_check(!xSemaphoreTake(rpc_session_context.close_session_semaphore, 0)); - test_rpc_encode_and_feed(input_before); - test_send_rubbish(session, pattern, pattern_size, size); - test_rpc_encode_and_feed(input_after); + furi_check(!xSemaphoreTake(rpc_session[0].close_session_semaphore, 0)); + test_rpc_encode_and_feed(input_before, 0); + test_send_rubbish(rpc_session[0].session, pattern, pattern_size, size); + test_rpc_encode_and_feed(input_after, 0); - test_rpc_decode_and_compare(expected); + test_rpc_decode_and_compare(expected, 0); test_rpc_teardown(); } @@ -1619,8 +1659,112 @@ MU_TEST(test_rpc_feed_rubbish) { FREE_LISTS(); } +MU_TEST(test_rpc_multisession_ping) { + MsgList_t input_0; + MsgList_init(input_0); + MsgList_t input_1; + MsgList_init(input_1); + MsgList_t expected_0; + MsgList_init(expected_0); + MsgList_t expected_1; + MsgList_init(expected_1); + + test_rpc_setup(); + + test_rpc_setup_second_session(); + test_rpc_teardown_second_session(); + + test_rpc_setup_second_session(); + + test_rpc_add_ping_to_list(input_0, PING_REQUEST, 0); + test_rpc_add_ping_to_list(input_1, PING_REQUEST, 1); + test_rpc_add_ping_to_list(expected_0, PING_RESPONSE, 0); + test_rpc_add_ping_to_list(expected_1, PING_RESPONSE, 1); + + test_rpc_encode_and_feed(input_0, 0); + test_rpc_encode_and_feed(input_1, 1); + test_rpc_decode_and_compare(expected_0, 0); + test_rpc_decode_and_compare(expected_1, 1); + + test_rpc_free_msg_list(input_0); + test_rpc_free_msg_list(input_1); + test_rpc_free_msg_list(expected_0); + test_rpc_free_msg_list(expected_1); + + test_rpc_teardown_second_session(); + test_rpc_teardown(); +} + +MU_TEST(test_rpc_multisession_storage) { + MsgList_t input_0; + MsgList_init(input_0); + MsgList_t input_1; + MsgList_init(input_1); + MsgList_t expected_0; + MsgList_init(expected_0); + MsgList_t expected_1; + MsgList_init(expected_1); + + test_rpc_storage_setup(); + test_rpc_setup_second_session(); + + uint8_t pattern[16] = "0123456789abcdef"; + + test_rpc_add_read_or_write_to_list( + input_0, WRITE_REQUEST, TEST_DIR "file0.txt", pattern, sizeof(pattern), 1, ++command_id); + test_rpc_add_empty_to_list(expected_0, PB_CommandStatus_OK, command_id); + + test_rpc_add_read_or_write_to_list( + input_1, WRITE_REQUEST, TEST_DIR "file1.txt", pattern, sizeof(pattern), 1, ++command_id); + test_rpc_add_empty_to_list(expected_1, PB_CommandStatus_OK, command_id); + + test_rpc_create_simple_message( + MsgList_push_raw(input_0), + PB_Main_storage_read_request_tag, + TEST_DIR "file0.txt", + ++command_id); + test_rpc_add_read_or_write_to_list( + expected_0, READ_RESPONSE, TEST_DIR "file0.txt", pattern, sizeof(pattern), 1, command_id); + + test_rpc_create_simple_message( + MsgList_push_raw(input_1), + PB_Main_storage_read_request_tag, + TEST_DIR "file1.txt", + ++command_id); + test_rpc_add_read_or_write_to_list( + expected_1, READ_RESPONSE, TEST_DIR "file1.txt", pattern, sizeof(pattern), 1, command_id); + + test_rpc_print_message_list(input_0); + test_rpc_print_message_list(input_1); + test_rpc_print_message_list(expected_0); + test_rpc_print_message_list(expected_1); + + test_rpc_encode_and_feed(input_0, 0); + test_rpc_encode_and_feed(input_1, 1); + + test_rpc_decode_and_compare(expected_0, 0); + test_rpc_decode_and_compare(expected_1, 1); + + test_rpc_free_msg_list(input_0); + test_rpc_free_msg_list(input_1); + test_rpc_free_msg_list(expected_0); + test_rpc_free_msg_list(expected_1); + + test_rpc_teardown_second_session(); + test_rpc_storage_teardown(); +} + MU_TEST_SUITE(test_rpc_session) { MU_RUN_TEST(test_rpc_feed_rubbish); + MU_RUN_TEST(test_rpc_multisession_ping); + + Storage* storage = furi_record_open("storage"); + if(storage_sd_status(storage) != FSE_OK) { + FURI_LOG_E(TAG, "SD card not mounted - skip storage tests"); + } else { + MU_RUN_TEST(test_rpc_multisession_storage); + } + furi_record_close("storage"); } int run_minunit_test_rpc() { @@ -1631,7 +1775,6 @@ int run_minunit_test_rpc() { MU_RUN_SUITE(test_rpc_storage); } furi_record_close("storage"); - MU_RUN_SUITE(test_rpc_system); MU_RUN_SUITE(test_rpc_app); MU_RUN_SUITE(test_rpc_session);