[FL-2116] RPC: stop session on decode error (#959)
* [FL-2116] RPC: stop session on decode error * Rollback protobuf * Send only DECODE_ERR, fix session tests * Tests: replace asserts with checks. Fix broken production build Co-authored-by: あく <alleteam@gmail.com>
This commit is contained in:
@@ -59,6 +59,7 @@ struct RpcSession {
|
||||
Rpc* rpc;
|
||||
bool terminate;
|
||||
void** system_contexts;
|
||||
bool decode_error;
|
||||
};
|
||||
|
||||
struct Rpc {
|
||||
@@ -78,11 +79,13 @@ static void rpc_close_session_process(const PB_Main* msg_request, void* context)
|
||||
furi_assert(context);
|
||||
|
||||
Rpc* rpc = context;
|
||||
rpc_send_and_release_empty(rpc, msg_request->command_id, PB_CommandStatus_OK);
|
||||
|
||||
rpc_send_and_release_empty(rpc, msg_request->command_id, PB_CommandStatus_OK);
|
||||
osMutexAcquire(rpc->session.callbacks_mutex, osWaitForever);
|
||||
if(rpc->session.closed_callback) {
|
||||
rpc->session.closed_callback(rpc->session.context);
|
||||
} else {
|
||||
FURI_LOG_W(TAG, "Session stop doesn't processed by transport layer");
|
||||
}
|
||||
osMutexRelease(rpc->session.callbacks_mutex);
|
||||
}
|
||||
@@ -378,6 +381,7 @@ RpcSession* rpc_session_open(Rpc* rpc) {
|
||||
session->callbacks_mutex = osMutexNew(NULL);
|
||||
session->rpc = rpc;
|
||||
session->terminate = false;
|
||||
session->decode_error = false;
|
||||
xStreamBufferReset(rpc->stream);
|
||||
|
||||
session->system_contexts = furi_alloc(COUNT_OF(rpc_systems) * sizeof(void*));
|
||||
@@ -507,6 +511,10 @@ bool rpc_pb_stream_read(pb_istream_t* istream, pb_byte_t* buf, size_t count) {
|
||||
rpc->session.buffer_is_empty_callback(rpc->session.context);
|
||||
}
|
||||
}
|
||||
if(rpc->session.decode_error) {
|
||||
/* never go out till RPC_EVENT_DISCONNECT come */
|
||||
bytes_received = 0;
|
||||
}
|
||||
if(count == bytes_received) {
|
||||
break;
|
||||
} else {
|
||||
@@ -595,6 +603,8 @@ int32_t rpc_srv(void* p) {
|
||||
.bytes_left = RPC_MAX_MESSAGE_SIZE, /* max incoming message size */
|
||||
};
|
||||
|
||||
bool message_decode_failed = false;
|
||||
|
||||
if(pb_decode_ex(&istream, &PB_Main_msg, rpc->decoded_message, PB_DECODE_DELIMITED)) {
|
||||
#if SRV_RPC_DEBUG
|
||||
FURI_LOG_I(TAG, "INPUT:");
|
||||
@@ -605,13 +615,49 @@ int32_t rpc_srv(void* p) {
|
||||
|
||||
if(handler && handler->message_handler) {
|
||||
handler->message_handler(rpc->decoded_message, handler->context);
|
||||
} else if(rpc->decoded_message->which_content == 0) {
|
||||
/* Receiving zeroes means message is 0-length, which
|
||||
* is valid for proto3: all fields are filled with default values.
|
||||
* 0 - is default value for which_content field.
|
||||
* Mark it as decode error, because there is no content message
|
||||
* in Main message with tag 0.
|
||||
*/
|
||||
message_decode_failed = true;
|
||||
} else if(!handler && !rpc->session.terminate) {
|
||||
FURI_LOG_E(TAG, "Unhandled message, tag: %d", rpc->decoded_message->which_content);
|
||||
FURI_LOG_E(
|
||||
TAG,
|
||||
"Message(%d) decoded, but not implemented",
|
||||
rpc->decoded_message->which_content);
|
||||
rpc_send_and_release_empty(
|
||||
rpc, rpc->decoded_message->command_id, PB_CommandStatus_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
} else {
|
||||
message_decode_failed = true;
|
||||
}
|
||||
|
||||
if(message_decode_failed) {
|
||||
xStreamBufferReset(rpc->stream);
|
||||
if(!rpc->session.terminate) {
|
||||
/* Protobuf can't determine start and end of message.
|
||||
* Handle this by adding varint at beginning
|
||||
* of a message (PB_ENCODE_DELIMITED). But decoding fail
|
||||
* means we can't be sure next bytes are varint for next
|
||||
* message, so the only way to close session.
|
||||
* RPC itself can't make decision to close session. It has
|
||||
* to notify:
|
||||
* 1) down layer (transport)
|
||||
* 2) other side (companion app)
|
||||
* Who are responsible to handle RPC session lifecycle.
|
||||
* Companion receives 2 messages: ERROR_DECODE and session_closed.
|
||||
*/
|
||||
FURI_LOG_E(TAG, "Decode failed, error: \'%.128s\'", PB_GET_ERROR(&istream));
|
||||
rpc->session.decode_error = true;
|
||||
rpc_send_and_release_empty(rpc, 0, PB_CommandStatus_ERROR_DECODE);
|
||||
osMutexAcquire(rpc->session.callbacks_mutex, osWaitForever);
|
||||
if(rpc->session.closed_callback) {
|
||||
rpc->session.closed_callback(rpc->session.context);
|
||||
}
|
||||
osMutexRelease(rpc->session.callbacks_mutex);
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user