[FL-2839] Furi stream buffer (#1834)

* Core: stream buffer
* stream buffer: API and usage
* stream buffer: documentation
* stream buffer: more documentation
* Furi: fix spelling

Co-authored-by: Aleksandr Kutuzov <alleteam@gmail.com>
This commit is contained in:
Sergey Gavrilov
2022-10-07 22:27:11 +10:00
committed by GitHub
parent d1843c0094
commit 38a82a1907
21 changed files with 403 additions and 208 deletions

View File

@@ -3,7 +3,6 @@
#include <notification/notification.h>
#include <notification/notification_messages.h>
#include <gui/elements.h>
#include <stream_buffer.h>
#include <furi_hal_uart.h>
#include <furi_hal_console.h>
#include <gui/view_dispatcher.h>
@@ -20,7 +19,7 @@ typedef struct {
ViewDispatcher* view_dispatcher;
View* view;
FuriThread* worker_thread;
StreamBufferHandle_t rx_stream;
FuriStreamBuffer* rx_stream;
} UartEchoApp;
typedef struct {
@@ -92,13 +91,11 @@ static uint32_t uart_echo_exit(void* context) {
static void uart_echo_on_irq_cb(UartIrqEvent ev, uint8_t data, void* context) {
furi_assert(context);
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
UartEchoApp* app = context;
if(ev == UartIrqEventRXNE) {
xStreamBufferSendFromISR(app->rx_stream, &data, 1, &xHigherPriorityTaskWoken);
furi_stream_buffer_send(app->rx_stream, &data, 1, 0);
furi_thread_flags_set(furi_thread_get_id(app->worker_thread), WorkerEventRx);
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}
}
@@ -158,7 +155,7 @@ static int32_t uart_echo_worker(void* context) {
size_t length = 0;
do {
uint8_t data[64];
length = xStreamBufferReceive(app->rx_stream, data, 64, 0);
length = furi_stream_buffer_receive(app->rx_stream, data, 64, 0);
if(length > 0) {
furi_hal_uart_tx(FuriHalUartIdUSART1, data, length);
with_view_model(
@@ -186,7 +183,7 @@ static int32_t uart_echo_worker(void* context) {
static UartEchoApp* uart_echo_app_alloc() {
UartEchoApp* app = malloc(sizeof(UartEchoApp));
app->rx_stream = xStreamBufferCreate(2048, 1);
app->rx_stream = furi_stream_buffer_alloc(2048, 1);
// Gui
app->gui = furi_record_open(RECORD_GUI);
@@ -260,7 +257,7 @@ static void uart_echo_app_free(UartEchoApp* app) {
furi_record_close(RECORD_NOTIFICATION);
app->gui = NULL;
vStreamBufferDelete(app->rx_stream);
furi_stream_buffer_free(app->rx_stream);
// Free rest
free(app);

View File

@@ -10,7 +10,6 @@
#include <furi.h>
#include "../minunit.h"
#include <stdint.h>
#include <stream_buffer.h>
#include <pb.h>
#include <pb_encode.h>
#include <m-list.h>
@@ -34,7 +33,7 @@ static uint32_t command_id = 0;
typedef struct {
RpcSession* session;
StreamBufferHandle_t output_stream;
FuriStreamBuffer* output_stream;
SemaphoreHandle_t close_session_semaphore;
SemaphoreHandle_t terminate_semaphore;
TickType_t timeout;
@@ -90,7 +89,7 @@ static void test_rpc_setup(void) {
}
furi_check(rpc_session[0].session);
rpc_session[0].output_stream = xStreamBufferCreate(1000, 1);
rpc_session[0].output_stream = furi_stream_buffer_alloc(1000, 1);
rpc_session_set_send_bytes_callback(rpc_session[0].session, output_bytes_callback);
rpc_session[0].close_session_semaphore = xSemaphoreCreateBinary();
rpc_session[0].terminate_semaphore = xSemaphoreCreateBinary();
@@ -110,7 +109,7 @@ static void test_rpc_setup_second_session(void) {
}
furi_check(rpc_session[1].session);
rpc_session[1].output_stream = xStreamBufferCreate(1000, 1);
rpc_session[1].output_stream = furi_stream_buffer_alloc(1000, 1);
rpc_session_set_send_bytes_callback(rpc_session[1].session, output_bytes_callback);
rpc_session[1].close_session_semaphore = xSemaphoreCreateBinary();
rpc_session[1].terminate_semaphore = xSemaphoreCreateBinary();
@@ -126,7 +125,7 @@ static void test_rpc_teardown(void) {
rpc_session_close(rpc_session[0].session);
furi_check(xSemaphoreTake(rpc_session[0].terminate_semaphore, portMAX_DELAY));
furi_record_close(RECORD_RPC);
vStreamBufferDelete(rpc_session[0].output_stream);
furi_stream_buffer_free(rpc_session[0].output_stream);
vSemaphoreDelete(rpc_session[0].close_session_semaphore);
vSemaphoreDelete(rpc_session[0].terminate_semaphore);
++command_id;
@@ -141,7 +140,7 @@ static void test_rpc_teardown_second_session(void) {
xSemaphoreTake(rpc_session[1].terminate_semaphore, 0);
rpc_session_close(rpc_session[1].session);
furi_check(xSemaphoreTake(rpc_session[1].terminate_semaphore, portMAX_DELAY));
vStreamBufferDelete(rpc_session[1].output_stream);
furi_stream_buffer_free(rpc_session[1].output_stream);
vSemaphoreDelete(rpc_session[1].close_session_semaphore);
vSemaphoreDelete(rpc_session[1].terminate_semaphore);
++command_id;
@@ -268,8 +267,8 @@ static PB_CommandStatus test_rpc_storage_get_file_error(File* file) {
static void output_bytes_callback(void* ctx, uint8_t* got_bytes, size_t got_size) {
RpcSessionContext* callbacks_context = ctx;
size_t bytes_sent =
xStreamBufferSend(callbacks_context->output_stream, got_bytes, got_size, FuriWaitForever);
size_t bytes_sent = furi_stream_buffer_send(
callbacks_context->output_stream, got_bytes, got_size, FuriWaitForever);
(void)bytes_sent;
furi_check(bytes_sent == got_size);
}
@@ -534,7 +533,8 @@ static bool test_rpc_pb_stream_read(pb_istream_t* istream, pb_byte_t* buf, size_
TickType_t now = xTaskGetTickCount();
int32_t time_left = session_context->timeout - now;
time_left = MAX(time_left, 0);
bytes_received = xStreamBufferReceive(session_context->output_stream, buf, count, time_left);
bytes_received =
furi_stream_buffer_receive(session_context->output_stream, buf, count, time_left);
return (count == bytes_received);
}

View File

@@ -1,6 +1,5 @@
#include "usb_uart_bridge.h"
#include "furi_hal.h"
#include <stream_buffer.h>
#include <furi_hal_usb_cdc.h>
#include "usb_cdc.h"
#include "cli/cli_vcp.h"
@@ -43,7 +42,7 @@ struct UsbUartBridge {
FuriThread* thread;
FuriThread* tx_thread;
StreamBufferHandle_t rx_stream;
FuriStreamBuffer* rx_stream;
FuriMutex* usb_mutex;
@@ -74,12 +73,10 @@ static int32_t usb_uart_tx_thread(void* context);
static void usb_uart_on_irq_cb(UartIrqEvent ev, uint8_t data, void* context) {
UsbUartBridge* usb_uart = (UsbUartBridge*)context;
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
if(ev == UartIrqEventRXNE) {
xStreamBufferSendFromISR(usb_uart->rx_stream, &data, 1, &xHigherPriorityTaskWoken);
furi_stream_buffer_send(usb_uart->rx_stream, &data, 1, 0);
furi_thread_flags_set(furi_thread_get_id(usb_uart->thread), WorkerEvtRxDone);
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}
}
@@ -156,7 +153,7 @@ static int32_t usb_uart_worker(void* context) {
memcpy(&usb_uart->cfg, &usb_uart->cfg_new, sizeof(UsbUartConfig));
usb_uart->rx_stream = xStreamBufferCreate(USB_UART_RX_BUF_SIZE, 1);
usb_uart->rx_stream = furi_stream_buffer_alloc(USB_UART_RX_BUF_SIZE, 1);
usb_uart->tx_sem = furi_semaphore_alloc(1, 1);
usb_uart->usb_mutex = furi_mutex_alloc(FuriMutexTypeNormal);
@@ -189,8 +186,8 @@ static int32_t usb_uart_worker(void* context) {
furi_check((events & FuriFlagError) == 0);
if(events & WorkerEvtStop) break;
if(events & WorkerEvtRxDone) {
size_t len =
xStreamBufferReceive(usb_uart->rx_stream, usb_uart->rx_buf, USB_CDC_PKT_LEN, 0);
size_t len = furi_stream_buffer_receive(
usb_uart->rx_stream, usb_uart->rx_buf, USB_CDC_PKT_LEN, 0);
if(len > 0) {
if(furi_semaphore_acquire(usb_uart->tx_sem, 100) == FuriStatusOk) {
usb_uart->st.rx_cnt += len;
@@ -199,7 +196,7 @@ static int32_t usb_uart_worker(void* context) {
furi_hal_cdc_send(usb_uart->cfg.vcp_ch, usb_uart->rx_buf, len);
furi_check(furi_mutex_release(usb_uart->usb_mutex) == FuriStatusOk);
} else {
xStreamBufferReset(usb_uart->rx_stream);
furi_stream_buffer_reset(usb_uart->rx_stream);
}
}
}
@@ -270,7 +267,7 @@ static int32_t usb_uart_worker(void* context) {
furi_thread_join(usb_uart->tx_thread);
furi_thread_free(usb_uart->tx_thread);
vStreamBufferDelete(usb_uart->rx_stream);
furi_stream_buffer_free(usb_uart->rx_stream);
furi_mutex_free(usb_uart->usb_mutex);
furi_semaphore_free(usb_uart->tx_sem);

View File

@@ -2,7 +2,6 @@
#include <furi.h>
#include <furi_hal.h>
#include <stream_buffer.h>
#include <lib/toolbox/args.h>
#include <lib/subghz/subghz_keystore.h>
@@ -194,23 +193,21 @@ void subghz_cli_command_tx(Cli* cli, FuriString* args, void* context) {
typedef struct {
volatile bool overrun;
StreamBufferHandle_t stream;
FuriStreamBuffer* stream;
size_t packet_count;
} SubGhzCliCommandRx;
static void subghz_cli_command_rx_capture_callback(bool level, uint32_t duration, void* context) {
SubGhzCliCommandRx* instance = context;
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
LevelDuration level_duration = level_duration_make(level, duration);
if(instance->overrun) {
instance->overrun = false;
level_duration = level_duration_reset();
}
size_t ret = xStreamBufferSendFromISR(
instance->stream, &level_duration, sizeof(LevelDuration), &xHigherPriorityTaskWoken);
size_t ret =
furi_stream_buffer_send(instance->stream, &level_duration, sizeof(LevelDuration), 0);
if(sizeof(LevelDuration) != ret) instance->overrun = true;
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}
static void subghz_cli_command_rx_callback(
@@ -249,7 +246,8 @@ void subghz_cli_command_rx(Cli* cli, FuriString* args, void* context) {
// Allocate context and buffers
SubGhzCliCommandRx* instance = malloc(sizeof(SubGhzCliCommandRx));
instance->stream = xStreamBufferCreate(sizeof(LevelDuration) * 1024, sizeof(LevelDuration));
instance->stream =
furi_stream_buffer_alloc(sizeof(LevelDuration) * 1024, sizeof(LevelDuration));
furi_check(instance->stream);
SubGhzEnvironment* environment = subghz_environment_alloc();
@@ -279,8 +277,8 @@ void subghz_cli_command_rx(Cli* cli, FuriString* args, void* context) {
printf("Listening at %lu. Press CTRL+C to stop\r\n", frequency);
LevelDuration level_duration;
while(!cli_cmd_interrupt_received(cli)) {
int ret =
xStreamBufferReceive(instance->stream, &level_duration, sizeof(LevelDuration), 10);
int ret = furi_stream_buffer_receive(
instance->stream, &level_duration, sizeof(LevelDuration), 10);
if(ret == sizeof(LevelDuration)) {
if(level_duration_is_reset(level_duration)) {
printf(".");
@@ -304,7 +302,7 @@ void subghz_cli_command_rx(Cli* cli, FuriString* args, void* context) {
// Cleanup
subghz_receiver_free(receiver);
subghz_environment_free(environment);
vStreamBufferDelete(instance->stream);
furi_stream_buffer_free(instance->stream);
free(instance);
}

View File

@@ -7,7 +7,6 @@
#include <time.h>
#include <notification/notification_messages.h>
#include <loader/loader.h>
#include <stream_buffer.h>
// Close to ISO, `date +'%Y-%m-%d %H:%M:%S %u'`
#define CLI_DATE_FORMAT "%.4d-%.2d-%.2d %.2d:%.2d:%.2d %d"
@@ -140,7 +139,7 @@ void cli_command_date(Cli* cli, FuriString* args, void* context) {
#define CLI_COMMAND_LOG_BUFFER_SIZE 64
void cli_command_log_tx_callback(const uint8_t* buffer, size_t size, void* context) {
xStreamBufferSend(context, buffer, size, 0);
furi_stream_buffer_send(context, buffer, size, 0);
}
void cli_command_log_level_set_from_string(FuriString* level) {
@@ -165,7 +164,7 @@ void cli_command_log_level_set_from_string(FuriString* level) {
void cli_command_log(Cli* cli, FuriString* args, void* context) {
UNUSED(context);
StreamBufferHandle_t ring = xStreamBufferCreate(CLI_COMMAND_LOG_RING_SIZE, 1);
FuriStreamBuffer* ring = furi_stream_buffer_alloc(CLI_COMMAND_LOG_RING_SIZE, 1);
uint8_t buffer[CLI_COMMAND_LOG_BUFFER_SIZE];
FuriLogLevel previous_level = furi_log_get_level();
bool restore_log_level = false;
@@ -179,7 +178,7 @@ void cli_command_log(Cli* cli, FuriString* args, void* context) {
printf("Press CTRL+C to stop...\r\n");
while(!cli_cmd_interrupt_received(cli)) {
size_t ret = xStreamBufferReceive(ring, buffer, CLI_COMMAND_LOG_BUFFER_SIZE, 50);
size_t ret = furi_stream_buffer_receive(ring, buffer, CLI_COMMAND_LOG_BUFFER_SIZE, 50);
cli_write(cli, buffer, ret);
}
@@ -190,7 +189,7 @@ void cli_command_log(Cli* cli, FuriString* args, void* context) {
furi_log_set_level(previous_level);
}
vStreamBufferDelete(ring);
furi_stream_buffer_free(ring);
}
void cli_command_vibro(Cli* cli, FuriString* args, void* context) {

View File

@@ -1,7 +1,6 @@
#include <furi_hal_usb_cdc.h>
#include <furi_hal.h>
#include <furi.h>
#include <stream_buffer.h>
#include "cli_i.h"
#define TAG "CliVcp"
@@ -29,8 +28,8 @@ typedef enum {
typedef struct {
FuriThread* thread;
StreamBufferHandle_t tx_stream;
StreamBufferHandle_t rx_stream;
FuriStreamBuffer* tx_stream;
FuriStreamBuffer* rx_stream;
volatile bool connected;
volatile bool running;
@@ -62,8 +61,8 @@ static const uint8_t ascii_eot = 0x04;
static void cli_vcp_init() {
if(vcp == NULL) {
vcp = malloc(sizeof(CliVcp));
vcp->tx_stream = xStreamBufferCreate(VCP_TX_BUF_SIZE, 1);
vcp->rx_stream = xStreamBufferCreate(VCP_RX_BUF_SIZE, 1);
vcp->tx_stream = furi_stream_buffer_alloc(VCP_TX_BUF_SIZE, 1);
vcp->rx_stream = furi_stream_buffer_alloc(VCP_RX_BUF_SIZE, 1);
}
furi_assert(vcp->thread == NULL);
@@ -113,7 +112,7 @@ static int32_t vcp_worker(void* context) {
#endif
if(vcp->connected == false) {
vcp->connected = true;
xStreamBufferSend(vcp->rx_stream, &ascii_soh, 1, FuriWaitForever);
furi_stream_buffer_send(vcp->rx_stream, &ascii_soh, 1, FuriWaitForever);
}
}
@@ -124,8 +123,8 @@ static int32_t vcp_worker(void* context) {
#endif
if(vcp->connected == true) {
vcp->connected = false;
xStreamBufferReceive(vcp->tx_stream, vcp->data_buffer, USB_CDC_PKT_LEN, 0);
xStreamBufferSend(vcp->rx_stream, &ascii_eot, 1, FuriWaitForever);
furi_stream_buffer_receive(vcp->tx_stream, vcp->data_buffer, USB_CDC_PKT_LEN, 0);
furi_stream_buffer_send(vcp->rx_stream, &ascii_eot, 1, FuriWaitForever);
}
}
@@ -134,7 +133,7 @@ static int32_t vcp_worker(void* context) {
#ifdef CLI_VCP_DEBUG
FURI_LOG_D(TAG, "StreamRx");
#endif
if(xStreamBufferSpacesAvailable(vcp->rx_stream) >= USB_CDC_PKT_LEN) {
if(furi_stream_buffer_spaces_available(vcp->rx_stream) >= USB_CDC_PKT_LEN) {
flags |= VcpEvtRx;
missed_rx--;
}
@@ -142,14 +141,15 @@ static int32_t vcp_worker(void* context) {
// New data received
if(flags & VcpEvtRx) {
if(xStreamBufferSpacesAvailable(vcp->rx_stream) >= USB_CDC_PKT_LEN) {
if(furi_stream_buffer_spaces_available(vcp->rx_stream) >= USB_CDC_PKT_LEN) {
int32_t len = furi_hal_cdc_receive(VCP_IF_NUM, vcp->data_buffer, USB_CDC_PKT_LEN);
#ifdef CLI_VCP_DEBUG
FURI_LOG_D(TAG, "Rx %d", len);
#endif
if(len > 0) {
furi_check(
xStreamBufferSend(vcp->rx_stream, vcp->data_buffer, len, FuriWaitForever) ==
furi_stream_buffer_send(
vcp->rx_stream, vcp->data_buffer, len, FuriWaitForever) ==
(size_t)len);
}
} else {
@@ -173,7 +173,7 @@ static int32_t vcp_worker(void* context) {
// CDC write transfer done
if(flags & VcpEvtTx) {
size_t len =
xStreamBufferReceive(vcp->tx_stream, vcp->data_buffer, USB_CDC_PKT_LEN, 0);
furi_stream_buffer_receive(vcp->tx_stream, vcp->data_buffer, USB_CDC_PKT_LEN, 0);
#ifdef CLI_VCP_DEBUG
FURI_LOG_D(TAG, "Tx %d", len);
#endif
@@ -202,8 +202,8 @@ static int32_t vcp_worker(void* context) {
furi_hal_usb_unlock();
furi_hal_usb_set_config(vcp->usb_if_prev, NULL);
}
xStreamBufferReceive(vcp->tx_stream, vcp->data_buffer, USB_CDC_PKT_LEN, 0);
xStreamBufferSend(vcp->rx_stream, &ascii_eot, 1, FuriWaitForever);
furi_stream_buffer_receive(vcp->tx_stream, vcp->data_buffer, USB_CDC_PKT_LEN, 0);
furi_stream_buffer_send(vcp->rx_stream, &ascii_eot, 1, FuriWaitForever);
break;
}
}
@@ -229,7 +229,7 @@ static size_t cli_vcp_rx(uint8_t* buffer, size_t size, uint32_t timeout) {
size_t batch_size = size;
if(batch_size > VCP_RX_BUF_SIZE) batch_size = VCP_RX_BUF_SIZE;
size_t len = xStreamBufferReceive(vcp->rx_stream, buffer, batch_size, timeout);
size_t len = furi_stream_buffer_receive(vcp->rx_stream, buffer, batch_size, timeout);
#ifdef CLI_VCP_DEBUG
FURI_LOG_D(TAG, "rx %u ", batch_size);
#endif
@@ -262,7 +262,7 @@ static void cli_vcp_tx(const uint8_t* buffer, size_t size) {
size_t batch_size = size;
if(batch_size > USB_CDC_PKT_LEN) batch_size = USB_CDC_PKT_LEN;
xStreamBufferSend(vcp->tx_stream, buffer, batch_size, FuriWaitForever);
furi_stream_buffer_send(vcp->tx_stream, buffer, batch_size, FuriWaitForever);
furi_thread_flags_set(furi_thread_get_id(vcp->thread), VcpEvtStreamTx);
#ifdef CLI_VCP_DEBUG
FURI_LOG_D(TAG, "tx %u", batch_size);

View File

@@ -13,7 +13,6 @@
#include <cli/cli.h>
#include <stdint.h>
#include <stdio.h>
#include <stream_buffer.h>
#include <m-dict.h>
#define TAG "RpcSrv"
@@ -61,7 +60,7 @@ struct RpcSession {
FuriThread* thread;
RpcHandlerDict_t handlers;
StreamBufferHandle_t stream;
FuriStreamBuffer* stream;
PB_Main* decoded_message;
bool terminate;
void** system_contexts;
@@ -151,7 +150,7 @@ size_t
furi_assert(encoded_bytes);
furi_assert(size > 0);
size_t bytes_sent = xStreamBufferSend(session->stream, encoded_bytes, size, timeout);
size_t bytes_sent = furi_stream_buffer_send(session->stream, encoded_bytes, size, timeout);
furi_thread_flags_set(furi_thread_get_id(session->thread), RpcEvtNewData);
@@ -160,7 +159,7 @@ size_t
size_t rpc_session_get_available_size(RpcSession* session) {
furi_assert(session);
return xStreamBufferSpacesAvailable(session->stream);
return furi_stream_buffer_spaces_available(session->stream);
}
bool rpc_pb_stream_read(pb_istream_t* istream, pb_byte_t* buf, size_t count) {
@@ -174,9 +173,9 @@ bool rpc_pb_stream_read(pb_istream_t* istream, pb_byte_t* buf, size_t count) {
size_t bytes_received = 0;
while(1) {
bytes_received +=
xStreamBufferReceive(session->stream, buf + bytes_received, count - bytes_received, 0);
if(xStreamBufferIsEmpty(session->stream)) {
bytes_received += furi_stream_buffer_receive(
session->stream, buf + bytes_received, count - bytes_received, 0);
if(furi_stream_buffer_is_empty(session->stream)) {
if(session->buffer_is_empty_callback) {
session->buffer_is_empty_callback(session->context);
}
@@ -190,7 +189,7 @@ bool rpc_pb_stream_read(pb_istream_t* istream, pb_byte_t* buf, size_t count) {
} else {
flags = furi_thread_flags_wait(RPC_ALL_EVENTS, FuriFlagWaitAny, FuriWaitForever);
if(flags & RpcEvtDisconnect) {
if(xStreamBufferIsEmpty(session->stream)) {
if(furi_stream_buffer_is_empty(session->stream)) {
session->terminate = true;
istream->bytes_left = 0;
bytes_received = 0;
@@ -279,7 +278,7 @@ static int32_t rpc_session_worker(void* context) {
}
if(message_decode_failed) {
xStreamBufferReset(session->stream);
furi_stream_buffer_reset(session->stream);
if(!session->terminate) {
/* Protobuf can't determine start and end of message.
* Handle this by adding varint at beginning
@@ -329,7 +328,7 @@ static void rpc_session_free_callback(FuriThreadState thread_state, void* contex
free(session->system_contexts);
free(session->decoded_message);
RpcHandlerDict_clear(session->handlers);
vStreamBufferDelete(session->stream);
furi_stream_buffer_free(session->stream);
furi_mutex_acquire(session->callbacks_mutex, FuriWaitForever);
if(session->terminated_callback) {
@@ -348,7 +347,7 @@ RpcSession* rpc_session_open(Rpc* rpc) {
RpcSession* session = malloc(sizeof(RpcSession));
session->callbacks_mutex = furi_mutex_alloc(FuriMutexTypeNormal);
session->stream = xStreamBufferCreate(RPC_BUFFER_SIZE, 1);
session->stream = furi_stream_buffer_alloc(RPC_BUFFER_SIZE, 1);
session->rpc = rpc;
session->terminate = false;
session->decode_error = false;