From da6e31b2bf66699053387772d94fdf04963ac719 Mon Sep 17 00:00:00 2001 From: Nikolay Minaylov Date: Thu, 24 Feb 2022 15:08:58 +0300 Subject: [PATCH] [FL-2242] RPC: Wait for session termination in unit tests (#1005) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * rpc: session termination callback * grammar fixes Co-authored-by: あく --- applications/rpc/rpc.c | 19 +++++++++++++++++++ applications/rpc/rpc.h | 12 ++++++++++++ applications/tests/rpc/rpc_test.c | 14 ++++++++++++++ 3 files changed, 45 insertions(+) mode change 100755 => 100644 applications/rpc/rpc.h diff --git a/applications/rpc/rpc.c b/applications/rpc/rpc.c index f4dfc31f..7bdd9ab5 100644 --- a/applications/rpc/rpc.c +++ b/applications/rpc/rpc.c @@ -54,6 +54,7 @@ struct RpcSession { RpcSendBytesCallback send_bytes_callback; RpcBufferIsEmptyCallback buffer_is_empty_callback; RpcSessionClosedCallback closed_callback; + RpcSessionTerminatedCallback terminated_callback; void* context; osMutexId_t callbacks_mutex; Rpc* rpc; @@ -429,6 +430,7 @@ static void rpc_free_session(RpcSession* session) { session->closed_callback = NULL; session->send_bytes_callback = NULL; session->buffer_is_empty_callback = NULL; + session->terminated_callback = NULL; } void rpc_session_set_context(RpcSession* session, void* context) { @@ -472,6 +474,18 @@ void rpc_session_set_buffer_is_empty_callback( osMutexRelease(session->callbacks_mutex); } +void rpc_session_set_terminated_callback( + RpcSession* session, + RpcSessionTerminatedCallback callback) { + furi_assert(session); + furi_assert(session->rpc); + furi_assert(session->rpc->busy); + + osMutexAcquire(session->callbacks_mutex, osWaitForever); + session->terminated_callback = callback; + osMutexRelease(session->callbacks_mutex); +} + /* Doesn't forbid using rpc_feed_bytes() after session close - it's safe. * Because any bytes received in buffer will be flushed before next session. * If bytes get into stream buffer before it's get epmtified and this @@ -665,6 +679,11 @@ int32_t rpc_srv(void* p) { if(rpc->session.terminate) { FURI_LOG_D(TAG, "Session terminated"); + osMutexAcquire(rpc->session.callbacks_mutex, osWaitForever); + if(rpc->session.terminated_callback) { + rpc->session.terminated_callback(rpc->session.context); + } + osMutexRelease(rpc->session.callbacks_mutex); osEventFlagsClear(rpc->events, RPC_EVENTS_ALL); rpc_free_session(&rpc->session); rpc->busy = false; diff --git a/applications/rpc/rpc.h b/applications/rpc/rpc.h old mode 100755 new mode 100644 index 2b53c9d5..38dd9af3 --- a/applications/rpc/rpc.h +++ b/applications/rpc/rpc.h @@ -21,6 +21,9 @@ typedef void (*RpcBufferIsEmptyCallback)(void* context); * is received. Any other actions lays on transport layer. * No destruction or session close preformed. */ typedef void (*RpcSessionClosedCallback)(void* context); +/** Callback to notify transport layer that session was closed + * and all operations were finished */ +typedef void (*RpcSessionTerminatedCallback)(void* context); /** Open RPC session * @@ -82,6 +85,15 @@ void rpc_session_set_buffer_is_empty_callback( */ void rpc_session_set_close_callback(RpcSession* session, RpcSessionClosedCallback callback); +/** Set callback to be called when RPC session is closed + * + * @param session pointer to RpcSession descriptor + * @param callback callback to inform about RPC session state + */ +void rpc_session_set_terminated_callback( + RpcSession* session, + RpcSessionTerminatedCallback callback); + /** Give bytes to RPC service to decode them and perform command * * @param session pointer to RpcSession descriptor diff --git a/applications/tests/rpc/rpc_test.c b/applications/tests/rpc/rpc_test.c index b51a1278..25f641a5 100644 --- a/applications/tests/rpc/rpc_test.c +++ b/applications/tests/rpc/rpc_test.c @@ -34,6 +34,7 @@ static uint32_t command_id = 0; typedef struct { StreamBufferHandle_t output_stream; SemaphoreHandle_t close_session_semaphore; + SemaphoreHandle_t terminate_semaphore; TickType_t timeout; } RpcSessionContext; @@ -74,6 +75,7 @@ static void test_rpc_compare_messages(PB_Main* result, PB_Main* expected); static void test_rpc_decode_and_compare(MsgList_t expected_msg_list); static void test_rpc_free_msg_list(MsgList_t msg_list); static void test_rpc_session_close_callback(void* context); +static void test_rpc_session_terminated_callback(void* context); static void test_rpc_setup(void) { furi_check(!rpc); @@ -89,16 +91,21 @@ static void test_rpc_setup(void) { rpc_session_context.output_stream = xStreamBufferCreate(1000, 1); rpc_session_set_send_bytes_callback(session, output_bytes_callback); rpc_session_context.close_session_semaphore = xSemaphoreCreateBinary(); + rpc_session_context.terminate_semaphore = xSemaphoreCreateBinary(); rpc_session_set_close_callback(session, test_rpc_session_close_callback); + rpc_session_set_terminated_callback(session, test_rpc_session_terminated_callback); rpc_session_set_context(session, &rpc_session_context); } static void test_rpc_teardown(void) { furi_check(rpc_session_context.close_session_semaphore); + xSemaphoreTake(rpc_session_context.terminate_semaphore, 0); rpc_session_close(session); + furi_check(xSemaphoreTake(rpc_session_context.terminate_semaphore, portMAX_DELAY)); furi_record_close("rpc"); vStreamBufferDelete(rpc_session_context.output_stream); vSemaphoreDelete(rpc_session_context.close_session_semaphore); + vSemaphoreDelete(rpc_session_context.terminate_semaphore); ++command_id; rpc_session_context.output_stream = NULL; rpc_session_context.close_session_semaphore = NULL; @@ -129,6 +136,13 @@ static void test_rpc_session_close_callback(void* context) { xSemaphoreGive(callbacks_context->close_session_semaphore); } +static void test_rpc_session_terminated_callback(void* context) { + furi_check(context); + RpcSessionContext* callbacks_context = context; + + xSemaphoreGive(callbacks_context->terminate_semaphore); +} + static void clean_directory(Storage* fs_api, const char* clean_dir) { furi_check(fs_api); furi_check(clean_dir);