diff --git a/firmware/targets/f7/furi_hal/furi_hal_flash.c b/firmware/targets/f7/furi_hal/furi_hal_flash.c index e49cd5f2..fc021d96 100644 --- a/firmware/targets/f7/furi_hal/furi_hal_flash.c +++ b/firmware/targets/f7/furi_hal/furi_hal_flash.c @@ -488,7 +488,7 @@ static const FuriHalFlashObMapping furi_hal_flash_ob_reg_map[FURI_HAL_FLASH_OB_T OB_REG_DEF(FuriHalFlashObInvalid, (NULL)), OB_REG_DEF(FuriHalFlashObInvalid, (NULL)), - OB_REG_DEF(FuriHalFlashObRegisterIPCCMail, (NULL)), + OB_REG_DEF(FuriHalFlashObRegisterIPCCMail, (&FLASH->IPCCBR)), OB_REG_DEF(FuriHalFlashObRegisterSecureFlash, (NULL)), OB_REG_DEF(FuriHalFlashObRegisterC2Opts, (NULL)), }; diff --git a/scripts/flipper/utils/openocd.py b/scripts/flipper/utils/openocd.py new file mode 100644 index 00000000..1309055b --- /dev/null +++ b/scripts/flipper/utils/openocd.py @@ -0,0 +1,173 @@ +import socket +import subprocess +import logging + + +class OpenOCD: + """OpenOCD cli wrapper""" + + COMMAND_TOKEN = "\x1a" + + def __init__(self, config: dict = {}) -> None: + assert isinstance(config, dict) + + # Params base + self.params = [] + + self.gdb_port = 3333 + self.telnet_port = 4444 + self.tcl_port = 6666 + + # Port + if port_base := config.get("port_base", None): + self.gdb_port = port_base + self.tcl_port = port_base + 1 + self.telnet_port = port_base + 2 + + self._add_command(f"gdb_port {self.gdb_port}") + self._add_command(f"tcl_port {self.tcl_port}") + self._add_command(f"telnet_port {self.telnet_port}") + + # Config files + + if interface := config.get("interface", None): + pass + else: + interface = "interface/stlink.cfg" + + if target := config.get("target", None): + pass + else: + target = "target/stm32wbx.cfg" + + self._add_file(interface) + self._add_file(target) + + # Programmer settings + if serial := config.get("serial", None): + self._add_command(f"{serial}") + + # Other params + if "params" in config: + self.params += config["params"] + + # logging + self.logger = logging.getLogger() + + def _add_command(self, command: str): + self.params.append("-c") + self.params.append(command) + + def _add_file(self, file: str): + self.params.append("-f") + self.params.append(file) + + def start(self, args: list[str] = []): + """Start OpenOCD process""" + + params = ["openocd", *self.params, *args] + self.logger.debug(f"_execute: {params}") + self.process = subprocess.Popen( + params, stderr=subprocess.PIPE, stdout=subprocess.PIPE + ) + + self._wait_for_openocd_tcl() + + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect(("127.0.0.1", self.tcl_port)) + + def _wait_for_openocd_tcl(self): + """Wait for OpenOCD to start""" + # TODO: timeout + while True: + stderr = self.process.stderr + if not stderr: + break + line = stderr.readline() + if not line: + break + line = line.decode("utf-8").strip() + self.logger.debug(f"OpenOCD: {line}") + if "Listening on port" in line and "for tcl connections" in line: + break + + def stop(self): + self.send_tcl("exit") + self.send_tcl("shutdown") + self.socket.close() + try: + self.process.wait(timeout=10) + except subprocess.TimeoutExpired as e: + self.process.kill() + self.logger.error("Failed to stop OpenOCD") + self.logger.exception(e) + self.postmortem() + + def send_tcl(self, cmd) -> str: + """Send a command string to TCL RPC. Return the result that was read.""" + + try: + data = (cmd + OpenOCD.COMMAND_TOKEN).encode("utf-8") + self.logger.debug(f"<- {data}") + + self.socket.send(data) + except Exception as e: + self.logger.error("Failed to send command to OpenOCD") + self.logger.exception(e) + self.postmortem() + raise + + try: + data = self._recv() + return data + except Exception as e: + self.logger.error("Failed to receive response from OpenOCD") + self.logger.exception(e) + self.postmortem() + raise + + def _recv(self): + """Read from the stream until the token (\x1a) was received.""" + # TODO: timeout + data = bytes() + while True: + chunk = self.socket.recv(4096) + data += chunk + if bytes(OpenOCD.COMMAND_TOKEN, encoding="utf-8") in chunk: + break + + self.logger.debug(f"-> {data}") + + data = data.decode("utf-8").strip() + data = data[:-1] # strip trailing \x1a + + return data + + def postmortem(self) -> None: + """Postmortem analysis of the OpenOCD process""" + stdout, stderr = self.process.communicate() + + log = self.logger.error + if self.process.returncode == 0: + log = self.logger.debug + log("OpenOCD exited normally") + else: + log("OpenOCD exited with error") + + log(f"Exit code: {self.process.returncode}") + for line in stdout.decode("utf-8").splitlines(): + log(f"Stdout: {line}") + + for line in stderr.decode("utf-8").splitlines(): + log(f"Stderr: {line}") + + def read_32(self, addr: int) -> int: + """Read 32-bit value from memory""" + data = self.send_tcl(f"mdw {addr}").strip() + data = data.split(": ")[-1] + data = int(data, 16) + return data + + def write_32(self, addr: int, value: int) -> None: + """Write 32-bit value to memory""" + self.send_tcl(f"mww {addr} {value}") diff --git a/scripts/flipper/utils/programmer.py b/scripts/flipper/utils/programmer.py new file mode 100644 index 00000000..84452d15 --- /dev/null +++ b/scripts/flipper/utils/programmer.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from enum import Enum + + +class Programmer(ABC): + def __init__(self): + pass + + class RunMode(Enum): + Run = "run" + Stop = "stop" + + @abstractmethod + def reset(self, mode: RunMode = RunMode.Run) -> bool: + pass + + @abstractmethod + def flash(self, address: int, file_path: str, verify: bool = True) -> bool: + pass + + @abstractmethod + def option_bytes_validate(self, file_path: str) -> bool: + pass + + @abstractmethod + def option_bytes_set(self, file_path: str) -> bool: + pass + + @abstractmethod + def otp_write(self, address: int, file_path: str) -> bool: + pass diff --git a/scripts/flipper/utils/programmer_openocd.py b/scripts/flipper/utils/programmer_openocd.py new file mode 100644 index 00000000..b3340610 --- /dev/null +++ b/scripts/flipper/utils/programmer_openocd.py @@ -0,0 +1,281 @@ +import logging +import os +import typing + +from flipper.utils.programmer import Programmer +from flipper.utils.openocd import OpenOCD +from flipper.utils.stm32wb55 import STM32WB55 +from flipper.assets.obdata import OptionBytesData + + +class OpenOCDProgrammer(Programmer): + def __init__( + self, + interface: str = "interface/cmsis-dap.cfg", + port_base: typing.Union[int, None] = None, + serial: typing.Union[str, None] = None, + ): + super().__init__() + + config = {} + + config["interface"] = interface + config["target"] = "target/stm32wbx.cfg" + + if not serial is None: + if interface == "interface/cmsis-dap.cfg": + config["serial"] = f"cmsis_dap_serial {serial}" + elif "stlink" in interface: + config["serial"] = f"stlink_serial {serial}" + + if not port_base is None: + config["port_base"] = port_base + + self.openocd = OpenOCD(config) + self.logger = logging.getLogger() + + def reset(self, mode: Programmer.RunMode = Programmer.RunMode.Run) -> bool: + stm32 = STM32WB55() + if mode == Programmer.RunMode.Run: + stm32.reset(self.openocd, stm32.RunMode.Run) + elif mode == Programmer.RunMode.Stop: + stm32.reset(self.openocd, stm32.RunMode.Init) + else: + raise Exception("Unknown mode") + + return True + + def flash(self, address: int, file_path: str, verify: bool = True) -> bool: + if not os.path.exists(file_path): + raise Exception(f"File {file_path} not found") + + self.openocd.start() + self.openocd.send_tcl(f"init") + self.openocd.send_tcl( + f"program {file_path} 0x{address:08x}{' verify' if verify else ''} reset exit" + ) + self.openocd.stop() + + return True + + def _ob_print_diff_table(self, ob_reference: bytes, ob_read: bytes, print_fn): + print_fn( + f'{"Reference": <20} {"Device": <20} {"Diff Reference": <20} {"Diff Device": <20}' + ) + + # Split into 8 byte, word + word + for i in range(0, len(ob_reference), 8): + ref = ob_reference[i : i + 8] + read = ob_read[i : i + 8] + + diff_str1 = "" + diff_str2 = "" + for j in range(0, len(ref.hex()), 2): + byte_str_1 = ref.hex()[j : j + 2] + byte_str_2 = read.hex()[j : j + 2] + + if byte_str_1 == byte_str_2: + diff_str1 += "__" + diff_str2 += "__" + else: + diff_str1 += byte_str_1 + diff_str2 += byte_str_2 + + print_fn( + f"{ref.hex(): <20} {read.hex(): <20} {diff_str1: <20} {diff_str2: <20}" + ) + + def option_bytes_validate(self, file_path: str) -> bool: + # Registers + stm32 = STM32WB55() + + # OpenOCD + self.openocd.start() + stm32.reset(self.openocd, stm32.RunMode.Init) + + # Generate Option Bytes data + ob_data = OptionBytesData(file_path) + ob_values = ob_data.gen_values().export() + ob_reference = ob_values.reference + ob_compare_mask = ob_values.compare_mask + ob_length = len(ob_reference) + ob_words = int(ob_length / 4) + + # Read Option Bytes + ob_read = bytes() + for i in range(ob_words): + addr = stm32.OPTION_BYTE_BASE + i * 4 + value = self.openocd.read_32(addr) + ob_read += value.to_bytes(4, "little") + + # Compare Option Bytes with reference by mask + ob_compare = bytes() + for i in range(ob_length): + ob_compare += bytes([ob_read[i] & ob_compare_mask[i]]) + + # Compare Option Bytes + return_code = False + + if ob_reference == ob_compare: + self.logger.info("Option Bytes are valid") + return_code = True + else: + self.logger.error("Option Bytes are invalid") + self._ob_print_diff_table(ob_reference, ob_compare, self.logger.error) + + # Stop OpenOCD + stm32.reset(self.openocd, stm32.RunMode.Run) + self.openocd.stop() + + return return_code + + def _unpack_u32(self, data: bytes, offset: int): + return int.from_bytes(data[offset : offset + 4], "little") + + def option_bytes_set(self, file_path: str) -> bool: + # Registers + stm32 = STM32WB55() + + # OpenOCD + self.openocd.start() + stm32.reset(self.openocd, stm32.RunMode.Init) + + # Generate Option Bytes data + ob_data = OptionBytesData(file_path) + ob_values = ob_data.gen_values().export() + ob_reference_bytes = ob_values.reference + ob_compare_mask_bytes = ob_values.compare_mask + ob_write_mask_bytes = ob_values.write_mask + ob_length = len(ob_reference_bytes) + ob_dwords = int(ob_length / 8) + + # Clear flash errors + stm32.clear_flash_errors(self.openocd) + + # Unlock Flash and Option Bytes + stm32.flash_unlock(self.openocd) + stm32.option_bytes_unlock(self.openocd) + + ob_need_to_apply = False + + for i in range(ob_dwords): + device_addr = stm32.OPTION_BYTE_BASE + i * 8 + device_value = self.openocd.read_32(device_addr) + ob_write_mask = self._unpack_u32(ob_write_mask_bytes, i * 8) + ob_compare_mask = self._unpack_u32(ob_compare_mask_bytes, i * 8) + ob_value_ref = self._unpack_u32(ob_reference_bytes, i * 8) + ob_value_masked = device_value & ob_compare_mask + + need_patch = ((ob_value_masked ^ ob_value_ref) & ob_write_mask) != 0 + if need_patch: + ob_need_to_apply = True + + self.logger.info( + f"Need to patch: {device_addr:08X}: {ob_value_masked:08X} != {ob_value_ref:08X}, REG[{i}]" + ) + + # Check if this option byte (dword) is mapped to a register + device_reg_addr = stm32.option_bytes_id_to_address(i) + + # Construct new value for the OB register + ob_value = device_value & (~ob_write_mask) + ob_value |= ob_value_ref & ob_write_mask + + self.logger.info(f"Writing {ob_value:08X} to {device_reg_addr:08X}") + self.openocd.write_32(device_reg_addr, ob_value) + + if ob_need_to_apply: + stm32.option_bytes_apply(self.openocd) + else: + self.logger.info(f"Option Bytes are already correct") + + # Load Option Bytes + # That will reset and also lock the Option Bytes and the Flash + stm32.option_bytes_load(self.openocd) + + # Stop OpenOCD + stm32.reset(self.openocd, stm32.RunMode.Run) + self.openocd.stop() + + return True + + def otp_write(self, address: int, file_path: str) -> bool: + # Open file, check that it aligned to 8 bytes + with open(file_path, "rb") as f: + data = f.read() + if len(data) % 8 != 0: + self.logger.error(f"File {file_path} is not aligned to 8 bytes") + return False + + # Check that address is aligned to 8 bytes + if address % 8 != 0: + self.logger.error(f"Address {address} is not aligned to 8 bytes") + return False + + # Get size of data + data_size = len(data) + + # Check that data size is aligned to 8 bytes + if data_size % 8 != 0: + self.logger.error(f"Data size {data_size} is not aligned to 8 bytes") + return False + + self.logger.debug(f"Writing {data_size} bytes to OTP at {address:08X}") + self.logger.debug(f"Data: {data.hex().upper()}") + + # Start OpenOCD + oocd = self.openocd + oocd.start() + + # Registers + stm32 = STM32WB55() + + try: + # Check that OTP is empty for the given address + # Also check that data is already written + already_written = True + for i in range(0, data_size, 4): + file_word = int.from_bytes(data[i : i + 4], "little") + device_word = oocd.read_32(address + i) + if device_word != 0xFFFFFFFF and device_word != file_word: + self.logger.error( + f"OTP memory at {address + i:08X} is not empty: {device_word:08X}" + ) + raise Exception("OTP memory is not empty") + + if device_word != file_word: + already_written = False + + if already_written: + self.logger.info(f"OTP memory is already written with the given data") + return True + + self.reset(self.RunMode.Stop) + stm32.clear_flash_errors(oocd) + + # Write OTP memory by 8 bytes + for i in range(0, data_size, 8): + word_1 = int.from_bytes(data[i : i + 4], "little") + word_2 = int.from_bytes(data[i + 4 : i + 8], "little") + self.logger.debug( + f"Writing {word_1:08X} {word_2:08X} to {address + i:08X}" + ) + stm32.write_flash_64(oocd, address + i, word_1, word_2) + + # Validate OTP memory + validation_result = True + + for i in range(0, data_size, 4): + file_word = int.from_bytes(data[i : i + 4], "little") + device_word = oocd.read_32(address + i) + if file_word != device_word: + self.logger.error( + f"Validation failed: {file_word:08X} != {device_word:08X} at {address + i:08X}" + ) + validation_result = False + finally: + # Stop OpenOCD + stm32.reset(oocd, stm32.RunMode.Run) + oocd.stop() + + return validation_result diff --git a/scripts/flipper/utils/register.py b/scripts/flipper/utils/register.py new file mode 100644 index 00000000..26d66730 --- /dev/null +++ b/scripts/flipper/utils/register.py @@ -0,0 +1,95 @@ +from dataclasses import dataclass +from flipper.utils.openocd import OpenOCD + + +@dataclass +class RegisterBitDefinition: + name: str + offset: int + size: int + value: int = 0 + + +class Register32: + def __init__(self, address: int, definition_list: list[RegisterBitDefinition]): + self.__dict__["names"] = [definition.name for definition in definition_list] + self.names = [definition.name for definition in definition_list] # typecheck + self.address = address + self.definition_list = definition_list + + # Validate that the definitions are not overlapping + for i in range(len(definition_list)): + for j in range(i + 1, len(definition_list)): + if self._is_overlapping(definition_list[i], definition_list[j]): + raise ValueError("Register definitions are overlapping") + + self.freezed = True + + def _is_overlapping( + self, a: RegisterBitDefinition, b: RegisterBitDefinition + ) -> bool: + if a.offset + a.size <= b.offset: + return False + if b.offset + b.size <= a.offset: + return False + return True + + def _get_definition(self, name: str) -> RegisterBitDefinition: + for definition in self.definition_list: + if definition.name == name: + return definition + raise ValueError(f"Register definition '{name}' not found") + + def get_definition_list(self) -> list[RegisterBitDefinition]: + return self.definition_list + + def get_address(self) -> int: + return self.address + + def set_reg_value(self, name: str, value: int): + definition = self._get_definition(name) + if value > (1 << definition.size) - 1: + raise ValueError( + f"Value {value} is too large for register definition '{name}'" + ) + definition.value = value + + def get_reg_value(self, name: str) -> int: + definition = self._get_definition(name) + return definition.value + + def __getattr__(self, attr): + if str(attr) in self.names: + return self.get_reg_value(str(attr)) + else: + return self.__dict__[attr] + + def __setattr__(self, attr, value): + if str(attr) in self.names: + self.set_reg_value(str(attr), value) + else: + if attr in self.__dict__ or "freezed" not in self.__dict__: + self.__dict__[attr] = value + else: + raise AttributeError(f"Attribute '{attr}' not found") + + def __dir__(self): + return self.names + + def set(self, value: int): + for definition in self.definition_list: + definition.value = (value >> definition.offset) & ( + (1 << definition.size) - 1 + ) + + def get(self) -> int: + value = 0 + for definition in self.definition_list: + value |= definition.value << definition.offset + return value + + def load(self, openocd: OpenOCD): + self.set(openocd.read_32(self.address)) + + def store(self, openocd: OpenOCD): + openocd.write_32(self.address, self.get()) diff --git a/scripts/flipper/utils/stm32wb55.py b/scripts/flipper/utils/stm32wb55.py new file mode 100644 index 00000000..910b0d7d --- /dev/null +++ b/scripts/flipper/utils/stm32wb55.py @@ -0,0 +1,352 @@ +import logging +from enum import Enum + +from flipper.utils.openocd import OpenOCD +from flipper.utils.register import Register32, RegisterBitDefinition + + +class STM32WB55: + # Address of OTP memory in flash + OTP_BASE = 0x1FFF7000 + + # Address of Option byte in flash + OPTION_BYTE_BASE = 0x1FFF8000 + + # Flash base address + FLASH_BASE = 0x58004000 + + # Flash unlock register + FLASH_KEYR = FLASH_BASE + 0x08 + + # Option byte unlock register + FLASH_OPTKEYR = FLASH_BASE + 0x0C + + # Flash unlock keys + FLASH_UNLOCK_KEY1 = 0x45670123 + FLASH_UNLOCK_KEY2 = 0xCDEF89AB + + # Option byte unlock keys + FLASH_UNLOCK_OPTKEY1 = 0x08192A3B + FLASH_UNLOCK_OPTKEY2 = 0x4C5D6E7F + + # Flash control register + FLASH_CR = Register32( + FLASH_BASE + 0x14, + [ + RegisterBitDefinition("PG", 0, 1), + RegisterBitDefinition("PER", 1, 1), + RegisterBitDefinition("MER", 2, 1), + RegisterBitDefinition("PNB", 3, 8), + RegisterBitDefinition("_", 11, 5), + RegisterBitDefinition("STRT", 16, 1), + RegisterBitDefinition("OPT_STRT", 17, 1), + RegisterBitDefinition("FSTPG", 18, 1), + RegisterBitDefinition("_", 19, 5), + RegisterBitDefinition("EOPIE", 24, 1), + RegisterBitDefinition("ERRIE", 25, 1), + RegisterBitDefinition("RD_ERRIE", 26, 1), + RegisterBitDefinition("OBL_LAUNCH", 27, 1), + RegisterBitDefinition("_", 28, 2), + RegisterBitDefinition("OPT_LOCK", 30, 1), + RegisterBitDefinition("LOCK", 31, 1), + ], + ) + + # Flash status register + FLASH_SR = Register32( + FLASH_BASE + 0x10, + [ + RegisterBitDefinition("EOP", 0, 1), + RegisterBitDefinition("OP_ERR", 1, 1), + RegisterBitDefinition("_", 2, 1), + RegisterBitDefinition("PROG_ERR", 3, 1), + RegisterBitDefinition("WRP_ERR", 4, 1), + RegisterBitDefinition("PGA_ERR", 5, 1), + RegisterBitDefinition("SIZE_ERR", 6, 1), + RegisterBitDefinition("PGS_ERR", 7, 1), + RegisterBitDefinition("MISS_ERR", 8, 1), + RegisterBitDefinition("FAST_ERR", 9, 1), + RegisterBitDefinition("_", 10, 3), + RegisterBitDefinition("OPTNV", 13, 1), + RegisterBitDefinition("RD_ERR", 14, 1), + RegisterBitDefinition("OPTV_ERR", 15, 1), + RegisterBitDefinition("BSY", 16, 1), + RegisterBitDefinition("_", 17, 1), + RegisterBitDefinition("CFGBSY", 18, 1), + RegisterBitDefinition("PESD", 19, 1), + RegisterBitDefinition("_", 20, 12), + ], + ) + + # Option byte registers + FLASH_OPTR = FLASH_BASE + 0x20 + FLASH_PCROP1ASR = FLASH_BASE + 0x24 + FLASH_PCROP1AER = FLASH_BASE + 0x28 + FLASH_WRP1AR = FLASH_BASE + 0x2C + FLASH_WRP1BR = FLASH_BASE + 0x30 + FLASH_PCROP1BSR = FLASH_BASE + 0x34 + FLASH_PCROP1BER = FLASH_BASE + 0x38 + FLASH_IPCCBR = FLASH_BASE + 0x3C + + # Map option byte dword index to register address + OPTION_BYTE_MAP_TO_REGS = { + 0: FLASH_OPTR, + 1: FLASH_PCROP1ASR, + 2: FLASH_PCROP1AER, + 3: FLASH_WRP1AR, + 4: FLASH_WRP1BR, + 5: FLASH_PCROP1BSR, + 6: FLASH_PCROP1BER, + 7: None, # Invalid Options + 8: None, # Invalid Options + 9: None, # Invalid Options + 10: None, # Invalid Options + 11: None, # Invalid Options + 12: None, # Invalid Options + 13: FLASH_IPCCBR, + 14: None, # Secure Flash + 15: None, # Core 2 Options + } + + def __init__(self): + self.logger = logging.getLogger("STM32WB55") + + class RunMode(Enum): + Init = "init" + Run = "run" + Halt = "halt" + + def reset(self, oocd: OpenOCD, mode: RunMode): + self.logger.debug("Resetting device") + oocd.send_tcl(f"reset {mode.value}") + + def clear_flash_errors(self, oocd: OpenOCD): + # Errata 2.2.9: Flash OPTVERR flag is always set after system reset + # And also clear all other flash error flags + self.logger.debug(f"Resetting flash errors") + self.FLASH_SR.load(oocd) + self.FLASH_SR.OP_ERR = 1 + self.FLASH_SR.PROG_ERR = 1 + self.FLASH_SR.WRP_ERR = 1 + self.FLASH_SR.PGA_ERR = 1 + self.FLASH_SR.SIZE_ERR = 1 + self.FLASH_SR.PGS_ERR = 1 + self.FLASH_SR.MISS_ERR = 1 + self.FLASH_SR.FAST_ERR = 1 + self.FLASH_SR.RD_ERR = 1 + self.FLASH_SR.OPTV_ERR = 1 + self.FLASH_SR.store(oocd) + + def flash_unlock(self, oocd: OpenOCD): + # Check if flash is already unlocked + self.FLASH_CR.load(oocd) + if self.FLASH_CR.LOCK == 0: + self.logger.debug("Flash is already unlocked") + return + + # Unlock flash + self.logger.debug("Unlocking Flash") + oocd.write_32(self.FLASH_KEYR, self.FLASH_UNLOCK_KEY1) + oocd.write_32(self.FLASH_KEYR, self.FLASH_UNLOCK_KEY2) + + # Check if flash is unlocked + self.FLASH_CR.load(oocd) + if self.FLASH_CR.LOCK == 0: + self.logger.debug("Flash unlocked") + else: + self.logger.error("Flash unlock failed") + raise Exception("Flash unlock failed") + + def option_bytes_unlock(self, oocd: OpenOCD): + # Check if options is already unlocked + self.FLASH_CR.load(oocd) + if self.FLASH_CR.OPT_LOCK == 0: + self.logger.debug("Options is already unlocked") + return + + # Unlock options + self.logger.debug("Unlocking Options") + oocd.write_32(self.FLASH_OPTKEYR, self.FLASH_UNLOCK_OPTKEY1) + oocd.write_32(self.FLASH_OPTKEYR, self.FLASH_UNLOCK_OPTKEY2) + + # Check if options is unlocked + self.FLASH_CR.load(oocd) + if self.FLASH_CR.OPT_LOCK == 0: + self.logger.debug("Options unlocked") + else: + self.logger.error("Options unlock failed") + raise Exception("Options unlock failed") + + def option_bytes_lock(self, oocd: OpenOCD): + # Check if options is already locked + self.FLASH_CR.load(oocd) + if self.FLASH_CR.OPT_LOCK == 1: + self.logger.debug("Options is already locked") + return + + # Lock options + self.logger.debug("Locking Options") + self.FLASH_CR.OPT_LOCK = 1 + self.FLASH_CR.store(oocd) + + # Check if options is locked + self.FLASH_CR.load(oocd) + if self.FLASH_CR.OPT_LOCK == 1: + self.logger.debug("Options locked") + else: + self.logger.error("Options lock failed") + raise Exception("Options lock failed") + + def flash_lock(self, oocd: OpenOCD): + # Check if flash is already locked + self.FLASH_CR.load(oocd) + if self.FLASH_CR.LOCK == 1: + self.logger.debug("Flash is already locked") + return + + # Lock flash + self.logger.debug("Locking Flash") + self.FLASH_CR.LOCK = 1 + self.FLASH_CR.store(oocd) + + # Check if flash is locked + self.FLASH_CR.load(oocd) + if self.FLASH_CR.LOCK == 1: + self.logger.debug("Flash locked") + else: + self.logger.error("Flash lock failed") + raise Exception("Flash lock failed") + + def option_bytes_apply(self, oocd: OpenOCD): + self.logger.debug(f"Applying Option Bytes") + + self.FLASH_CR.load(oocd) + self.FLASH_CR.OPT_STRT = 1 + self.FLASH_CR.store(oocd) + + # Wait for Option Bytes to be applied + self.flash_wait_for_operation(oocd) + + def option_bytes_load(self, oocd: OpenOCD): + self.logger.debug(f"Loading Option Bytes") + self.FLASH_CR.load(oocd) + self.FLASH_CR.OBL_LAUNCH = 1 + self.FLASH_CR.store(oocd) + + def option_bytes_id_to_address(self, id: int) -> int: + # Check if this option byte (dword) is mapped to a register + device_reg_addr = self.OPTION_BYTE_MAP_TO_REGS.get(id, None) + if device_reg_addr is None: + raise Exception(f"Option Byte {id} is not mapped to a register") + + return device_reg_addr + + def flash_wait_for_operation(self, oocd: OpenOCD): + # Wait for flash operation to complete + # TODO: timeout + while True: + self.FLASH_SR.load(oocd) + if self.FLASH_SR.BSY == 0: + break + + def flash_dump_status_register(self, oocd: OpenOCD): + self.FLASH_SR.load(oocd) + self.logger.info(f"FLASH_SR: {self.FLASH_SR.get():08x}") + if self.FLASH_SR.EOP: + self.logger.info(" End of operation") + if self.FLASH_SR.OP_ERR: + self.logger.error(" Operation error") + if self.FLASH_SR.PROG_ERR: + self.logger.error(" Programming error") + if self.FLASH_SR.WRP_ERR: + self.logger.error(" Write protection error") + if self.FLASH_SR.PGA_ERR: + self.logger.error(" Programming alignment error") + if self.FLASH_SR.SIZE_ERR: + self.logger.error(" Size error") + if self.FLASH_SR.PGS_ERR: + self.logger.error(" Programming sequence error") + if self.FLASH_SR.MISS_ERR: + self.logger.error(" Fast programming data miss error") + if self.FLASH_SR.FAST_ERR: + self.logger.error(" Fast programming error") + if self.FLASH_SR.OPTNV: + self.logger.info(" User option OPTVAL indication") + if self.FLASH_SR.RD_ERR: + self.logger.info(" PCROP read error") + if self.FLASH_SR.OPTV_ERR: + self.logger.info(" Option and Engineering bits loading validity error") + if self.FLASH_SR.BSY: + self.logger.info(" Busy") + if self.FLASH_SR.CFGBSY: + self.logger.info(" Programming or erase configuration busy") + if self.FLASH_SR.PESD: + self.logger.info(" Programming / erase operation suspended.") + + def write_flash_64(self, oocd: OpenOCD, address: int, word_1: int, word_2: int): + self.logger.debug(f"Writing flash at address {address:08x}") + + if address % 8 != 0: + self.logger.error("Address must be aligned to 8 bytes") + raise Exception("Address must be aligned to 8 bytes") + + if word_1 == oocd.read_32(address) and word_2 == oocd.read_32(address + 4): + self.logger.debug("Data is already programmed") + return + + self.flash_unlock(oocd) + + # Check that no flash main memory operation is ongoing by checking the BSY bit + self.FLASH_SR.load(oocd) + if self.FLASH_SR.BSY: + self.logger.error("Flash is busy") + self.flash_dump_status_register(oocd) + raise Exception("Flash is busy") + + # Enable end of operation interrupts and error interrupts + self.FLASH_CR.load(oocd) + self.FLASH_CR.EOPIE = 1 + self.FLASH_CR.ERRIE = 1 + self.FLASH_CR.store(oocd) + + # Check that flash memory program and erase operations are allowed + if self.FLASH_SR.PESD: + self.logger.error("Flash operations are not allowed") + self.flash_dump_status_register(oocd) + raise Exception("Flash operations are not allowed") + + # Check and clear all error programming flags due to a previous programming. + self.clear_flash_errors(oocd) + + # Set the PG bit in the Flash memory control register (FLASH_CR) + self.FLASH_CR.load(oocd) + self.FLASH_CR.PG = 1 + self.FLASH_CR.store(oocd) + + # Perform the data write operation at the desired memory address, only double word (64 bits) can be programmed. + # Write the first word + oocd.send_tcl(f"mww 0x{address:08x} 0x{word_1:08x}") + # Write the second word + oocd.send_tcl(f"mww 0x{(address + 4):08x} 0x{word_2:08x}") + + # Wait for the BSY bit to be cleared + self.flash_wait_for_operation(oocd) + + # Check that EOP flag is set in the FLASH_SR register + self.FLASH_SR.load(oocd) + if not self.FLASH_SR.EOP: + self.logger.error("Flash operation failed") + self.flash_dump_status_register(oocd) + raise Exception("Flash operation failed") + + # Clear the EOP flag + self.FLASH_SR.load(oocd) + self.FLASH_SR.EOP = 1 + self.FLASH_SR.store(oocd) + + # Clear the PG bit in the FLASH_CR register + self.FLASH_CR.load(oocd) + self.FLASH_CR.PG = 0 + self.FLASH_CR.store(oocd) + + self.flash_lock(oocd) diff --git a/scripts/ob.py b/scripts/ob.py index 722549d6..178fe16a 100755 --- a/scripts/ob.py +++ b/scripts/ob.py @@ -1,69 +1,79 @@ #!/usr/bin/env python3 -import logging -import argparse -import subprocess -import sys -import os +from os import path from flipper.app import App -from flipper.cube import CubeProgrammer +from flipper.utils.programmer_openocd import OpenOCDProgrammer class Main(App): def init(self): + # Subparsers self.subparsers = self.parser.add_subparsers(help="sub-command help") + + # Check command self.parser_check = self.subparsers.add_parser( "check", help="Check Option Bytes" ) - self._addArgsSWD(self.parser_check) + self._add_args(self.parser_check) self.parser_check.set_defaults(func=self.check) + # Set command self.parser_set = self.subparsers.add_parser("set", help="Set Option Bytes") - self._addArgsSWD(self.parser_set) + self._add_args(self.parser_set) self.parser_set.set_defaults(func=self.set) - # OB - self.ob = {} - def _addArgsSWD(self, parser): + def _add_args(self, parser): parser.add_argument( - "--port", type=str, help="Port to connect: swd or usb1", default="swd" + "--port-base", type=int, help="OpenOCD port base", default=3333 + ) + parser.add_argument( + "--interface", + type=str, + help="OpenOCD interface", + default="interface/cmsis-dap.cfg", + ) + parser.add_argument( + "--serial", type=str, help="OpenOCD interface serial number" + ) + parser.add_argument( + "--ob-path", + type=str, + help="Option bytes file", + default=path.join(path.dirname(__file__), "ob.data"), ) - parser.add_argument("--serial", type=str, help="ST-Link Serial Number") - - def _getCubeParams(self): - return { - "port": self.args.port, - "serial": self.args.serial, - } - - def before(self): - self.logger.info(f"Loading Option Bytes data") - file_path = os.path.join(os.path.dirname(sys.argv[0]), "ob.data") - with open(file_path, "r") as file: - for line in file.readlines(): - k, v, o = line.split(":") - self.ob[k.strip()] = v.strip(), o.strip() def check(self): self.logger.info(f"Checking Option Bytes") - cp = CubeProgrammer(self._getCubeParams()) - if cp.checkOptionBytes(self.ob): - self.logger.info(f"OB Check OK") - return 0 - else: - self.logger.error(f"OB Check FAIL") - return 255 + + # OpenOCD + openocd = OpenOCDProgrammer( + self.args.interface, + self.args.port_base, + self.args.serial, + ) + + return_code = 1 + if openocd.option_bytes_validate(self.args.ob_path): + return_code = 0 + + return return_code def set(self): self.logger.info(f"Setting Option Bytes") - cp = CubeProgrammer(self._getCubeParams()) - if cp.setOptionBytes(self.ob): - self.logger.info(f"OB Set OK") - return 0 - else: - self.logger.error(f"OB Set FAIL") - return 255 + + # OpenOCD + openocd = OpenOCDProgrammer( + self.args.interface, + self.args.port_base, + self.args.serial, + ) + + return_code = 1 + if openocd.option_bytes_set(self.args.ob_path): + return_code = 0 + + return return_code if __name__ == "__main__": diff --git a/scripts/otp.py b/scripts/otp.py index 48056fd2..3bfe30d2 100755 --- a/scripts/otp.py +++ b/scripts/otp.py @@ -35,6 +35,7 @@ OTP_DISPLAYS = { from flipper.app import App from flipper.cube import CubeProgrammer +from flipper.utils.programmer_openocd import OpenOCDProgrammer class Main(App): @@ -53,21 +54,21 @@ class Main(App): self.parser_flash_first = self.subparsers.add_parser( "flash_first", help="Flash first block of OTP to device" ) - self._addArgsSWD(self.parser_flash_first) + self._addArgsOpenOCD(self.parser_flash_first) self._addFirstArgs(self.parser_flash_first) self.parser_flash_first.set_defaults(func=self.flash_first) # Flash Second self.parser_flash_second = self.subparsers.add_parser( "flash_second", help="Flash second block of OTP to device" ) - self._addArgsSWD(self.parser_flash_second) + self._addArgsOpenOCD(self.parser_flash_second) self._addSecondArgs(self.parser_flash_second) self.parser_flash_second.set_defaults(func=self.flash_second) # Flash All self.parser_flash_all = self.subparsers.add_parser( "flash_all", help="Flash OTP to device" ) - self._addArgsSWD(self.parser_flash_all) + self._addArgsOpenOCD(self.parser_flash_all) self._addFirstArgs(self.parser_flash_all) self._addSecondArgs(self.parser_flash_all) self.parser_flash_all.set_defaults(func=self.flash_all) @@ -75,17 +76,19 @@ class Main(App): self.logger = logging.getLogger() self.timestamp = datetime.datetime.now().timestamp() - def _addArgsSWD(self, parser): + def _addArgsOpenOCD(self, parser): parser.add_argument( - "--port", type=str, help="Port to connect: swd or usb1", default="swd" + "--port-base", type=int, help="OpenOCD port base", default=3333 + ) + parser.add_argument( + "--interface", + type=str, + help="OpenOCD interface", + default="interface/cmsis-dap.cfg", + ) + parser.add_argument( + "--serial", type=str, help="OpenOCD interface serial number" ) - parser.add_argument("--serial", type=str, help="ST-Link Serial Number") - - def _getCubeParams(self): - return { - "port": self.args.port, - "serial": self.args.serial, - } def _addFirstArgs(self, parser): parser.add_argument("--version", type=int, help="Version", required=True) @@ -173,14 +176,22 @@ class Main(App): file.write(self._packFirst()) self.logger.info(f"Flashing OTP") - cp = CubeProgrammer(self._getCubeParams()) - cp.flashBin("0x1FFF7000", filename) - cp.resetTarget() + + openocd = OpenOCDProgrammer( + self.args.interface, + self.args.port_base, + self.args.serial, + ) + + if not openocd.otp_write(0x1FFF7000, filename): + raise Exception("Failed to flash OTP") + self.logger.info(f"Flashed Successfully") - os.remove(filename) except Exception as e: self.logger.exception(e) return 1 + finally: + os.remove(filename) return 0 @@ -197,14 +208,22 @@ class Main(App): file.write(self._packSecond()) self.logger.info(f"Flashing OTP") - cp = CubeProgrammer(self._getCubeParams()) - cp.flashBin("0x1FFF7010", filename) - cp.resetTarget() + + openocd = OpenOCDProgrammer( + self.args.interface, + self.args.port_base, + self.args.serial, + ) + + if not openocd.otp_write(0x1FFF7010, filename): + raise Exception("Failed to flash OTP") + self.logger.info(f"Flashed Successfully") - os.remove(filename) except Exception as e: self.logger.exception(e) return 1 + finally: + os.remove(filename) return 0 @@ -223,14 +242,22 @@ class Main(App): file.write(self._packSecond()) self.logger.info(f"Flashing OTP") - cp = CubeProgrammer(self._getCubeParams()) - cp.flashBin("0x1FFF7000", filename) - cp.resetTarget() + + openocd = OpenOCDProgrammer( + self.args.interface, + self.args.port_base, + self.args.serial, + ) + + if not openocd.otp_write(0x1FFF7000, filename): + raise Exception("Failed to flash OTP") + self.logger.info(f"Flashed Successfully") - os.remove(filename) except Exception as e: self.logger.exception(e) return 1 + finally: + os.remove(filename) return 0