[FL-2242] RPC: Wait for session termination in unit tests (#1005)

* rpc: session termination callback
* grammar fixes

Co-authored-by: あく <alleteam@gmail.com>
This commit is contained in:
Nikolay Minaylov 2022-02-24 15:08:58 +03:00 committed by GitHub
parent 24987b95cd
commit da6e31b2bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 0 deletions

View File

@ -54,6 +54,7 @@ struct RpcSession {
RpcSendBytesCallback send_bytes_callback; RpcSendBytesCallback send_bytes_callback;
RpcBufferIsEmptyCallback buffer_is_empty_callback; RpcBufferIsEmptyCallback buffer_is_empty_callback;
RpcSessionClosedCallback closed_callback; RpcSessionClosedCallback closed_callback;
RpcSessionTerminatedCallback terminated_callback;
void* context; void* context;
osMutexId_t callbacks_mutex; osMutexId_t callbacks_mutex;
Rpc* rpc; Rpc* rpc;
@ -429,6 +430,7 @@ static void rpc_free_session(RpcSession* session) {
session->closed_callback = NULL; session->closed_callback = NULL;
session->send_bytes_callback = NULL; session->send_bytes_callback = NULL;
session->buffer_is_empty_callback = NULL; session->buffer_is_empty_callback = NULL;
session->terminated_callback = NULL;
} }
void rpc_session_set_context(RpcSession* session, void* context) { void rpc_session_set_context(RpcSession* session, void* context) {
@ -472,6 +474,18 @@ void rpc_session_set_buffer_is_empty_callback(
osMutexRelease(session->callbacks_mutex); osMutexRelease(session->callbacks_mutex);
} }
void rpc_session_set_terminated_callback(
RpcSession* session,
RpcSessionTerminatedCallback callback) {
furi_assert(session);
furi_assert(session->rpc);
furi_assert(session->rpc->busy);
osMutexAcquire(session->callbacks_mutex, osWaitForever);
session->terminated_callback = callback;
osMutexRelease(session->callbacks_mutex);
}
/* Doesn't forbid using rpc_feed_bytes() after session close - it's safe. /* Doesn't forbid using rpc_feed_bytes() after session close - it's safe.
* Because any bytes received in buffer will be flushed before next session. * Because any bytes received in buffer will be flushed before next session.
* If bytes get into stream buffer before it's get epmtified and this * If bytes get into stream buffer before it's get epmtified and this
@ -665,6 +679,11 @@ int32_t rpc_srv(void* p) {
if(rpc->session.terminate) { if(rpc->session.terminate) {
FURI_LOG_D(TAG, "Session terminated"); FURI_LOG_D(TAG, "Session terminated");
osMutexAcquire(rpc->session.callbacks_mutex, osWaitForever);
if(rpc->session.terminated_callback) {
rpc->session.terminated_callback(rpc->session.context);
}
osMutexRelease(rpc->session.callbacks_mutex);
osEventFlagsClear(rpc->events, RPC_EVENTS_ALL); osEventFlagsClear(rpc->events, RPC_EVENTS_ALL);
rpc_free_session(&rpc->session); rpc_free_session(&rpc->session);
rpc->busy = false; rpc->busy = false;

12
applications/rpc/rpc.h Executable file → Normal file
View File

@ -21,6 +21,9 @@ typedef void (*RpcBufferIsEmptyCallback)(void* context);
* is received. Any other actions lays on transport layer. * is received. Any other actions lays on transport layer.
* No destruction or session close preformed. */ * No destruction or session close preformed. */
typedef void (*RpcSessionClosedCallback)(void* context); typedef void (*RpcSessionClosedCallback)(void* context);
/** Callback to notify transport layer that session was closed
* and all operations were finished */
typedef void (*RpcSessionTerminatedCallback)(void* context);
/** Open RPC session /** Open RPC session
* *
@ -82,6 +85,15 @@ void rpc_session_set_buffer_is_empty_callback(
*/ */
void rpc_session_set_close_callback(RpcSession* session, RpcSessionClosedCallback callback); void rpc_session_set_close_callback(RpcSession* session, RpcSessionClosedCallback callback);
/** Set callback to be called when RPC session is closed
*
* @param session pointer to RpcSession descriptor
* @param callback callback to inform about RPC session state
*/
void rpc_session_set_terminated_callback(
RpcSession* session,
RpcSessionTerminatedCallback callback);
/** Give bytes to RPC service to decode them and perform command /** Give bytes to RPC service to decode them and perform command
* *
* @param session pointer to RpcSession descriptor * @param session pointer to RpcSession descriptor

View File

@ -34,6 +34,7 @@ static uint32_t command_id = 0;
typedef struct { typedef struct {
StreamBufferHandle_t output_stream; StreamBufferHandle_t output_stream;
SemaphoreHandle_t close_session_semaphore; SemaphoreHandle_t close_session_semaphore;
SemaphoreHandle_t terminate_semaphore;
TickType_t timeout; TickType_t timeout;
} RpcSessionContext; } RpcSessionContext;
@ -74,6 +75,7 @@ static void test_rpc_compare_messages(PB_Main* result, PB_Main* expected);
static void test_rpc_decode_and_compare(MsgList_t expected_msg_list); static void test_rpc_decode_and_compare(MsgList_t expected_msg_list);
static void test_rpc_free_msg_list(MsgList_t msg_list); static void test_rpc_free_msg_list(MsgList_t msg_list);
static void test_rpc_session_close_callback(void* context); static void test_rpc_session_close_callback(void* context);
static void test_rpc_session_terminated_callback(void* context);
static void test_rpc_setup(void) { static void test_rpc_setup(void) {
furi_check(!rpc); furi_check(!rpc);
@ -89,16 +91,21 @@ static void test_rpc_setup(void) {
rpc_session_context.output_stream = xStreamBufferCreate(1000, 1); rpc_session_context.output_stream = xStreamBufferCreate(1000, 1);
rpc_session_set_send_bytes_callback(session, output_bytes_callback); rpc_session_set_send_bytes_callback(session, output_bytes_callback);
rpc_session_context.close_session_semaphore = xSemaphoreCreateBinary(); rpc_session_context.close_session_semaphore = xSemaphoreCreateBinary();
rpc_session_context.terminate_semaphore = xSemaphoreCreateBinary();
rpc_session_set_close_callback(session, test_rpc_session_close_callback); rpc_session_set_close_callback(session, test_rpc_session_close_callback);
rpc_session_set_terminated_callback(session, test_rpc_session_terminated_callback);
rpc_session_set_context(session, &rpc_session_context); rpc_session_set_context(session, &rpc_session_context);
} }
static void test_rpc_teardown(void) { static void test_rpc_teardown(void) {
furi_check(rpc_session_context.close_session_semaphore); furi_check(rpc_session_context.close_session_semaphore);
xSemaphoreTake(rpc_session_context.terminate_semaphore, 0);
rpc_session_close(session); rpc_session_close(session);
furi_check(xSemaphoreTake(rpc_session_context.terminate_semaphore, portMAX_DELAY));
furi_record_close("rpc"); furi_record_close("rpc");
vStreamBufferDelete(rpc_session_context.output_stream); vStreamBufferDelete(rpc_session_context.output_stream);
vSemaphoreDelete(rpc_session_context.close_session_semaphore); vSemaphoreDelete(rpc_session_context.close_session_semaphore);
vSemaphoreDelete(rpc_session_context.terminate_semaphore);
++command_id; ++command_id;
rpc_session_context.output_stream = NULL; rpc_session_context.output_stream = NULL;
rpc_session_context.close_session_semaphore = NULL; rpc_session_context.close_session_semaphore = NULL;
@ -129,6 +136,13 @@ static void test_rpc_session_close_callback(void* context) {
xSemaphoreGive(callbacks_context->close_session_semaphore); xSemaphoreGive(callbacks_context->close_session_semaphore);
} }
static void test_rpc_session_terminated_callback(void* context) {
furi_check(context);
RpcSessionContext* callbacks_context = context;
xSemaphoreGive(callbacks_context->terminate_semaphore);
}
static void clean_directory(Storage* fs_api, const char* clean_dir) { static void clean_directory(Storage* fs_api, const char* clean_dir) {
furi_check(fs_api); furi_check(fs_api);
furi_check(clean_dir); furi_check(clean_dir);