[FL-3097] fbt, faploader: minimal app module implementation (#2420)

* fbt, faploader: minimal app module implementation
* faploader, libs: moved API hashtable core to flipper_application
* example: compound api
* lib: flipper_application: naming fixes, doxygen comments
* fbt: changed `requires` manifest field behavior for app extensions
* examples: refactored plugin apps; faploader: changed new API naming; fbt: changed PLUGIN app type meaning
* loader: dropped support for debug apps & plugin menus
* moved applications/plugins -> applications/external
* Restored x bit on chiplist_convert.py
* git: fixed free-dap submodule path
* pvs: updated submodule paths
* examples: example_advanced_plugins.c: removed potential memory leak on errors
* examples: example_plugins: refined requires
* fbt: not deploying app modules for debug/sample apps; extra validation for .PLUGIN-type apps
* apps: removed cdefines for external apps
* fbt: moved ext app path definition
* fbt: reworked fap_dist handling; f18: synced api_symbols.csv
* fbt: removed resources_paths for extapps
* scripts: reworked storage
* scripts: reworked runfap.py & selfupdate.py to use new api
* wip: fal runner
* fbt: moved file packaging into separate module
* scripts: storage: fixes
* scripts: storage: minor fixes for new api
* fbt: changed internal artifact storage details for external apps
* scripts: storage: additional fixes and better error reporting; examples: using APP_DATA_PATH()
* fbt, scripts: reworked launch_app to deploy plugins; moved old runfap.py to distfap.py
* fbt: extra check for plugins descriptors
* fbt: additional checks in emitter
* fbt: better info message on SDK rebuild
* scripts: removed requirements.txt
* loader: removed remnants of plugins & debug menus
* post-review fixes
This commit is contained in:
hedger
2023-03-14 18:29:28 +04:00
committed by GitHub
parent 4bd3dca16f
commit 53435579b3
376 changed files with 2041 additions and 1036 deletions

71
scripts/distfap.py Normal file
View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
from flipper.app import App
from flipper.storage import FlipperStorage, FlipperStorageOperations
from flipper.utils.cdc import resolve_port
import os
import posixpath
class Main(App):
def init(self):
self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
self.parser.add_argument(
"-n",
"--no-launch",
dest="launch_app",
action="store_false",
help="Don't launch app",
)
self.parser.add_argument("fap_src_path", help="App file to upload")
self.parser.add_argument(
"--fap_dst_dir", help="Upload path", default="/ext/apps", required=False
)
self.parser.set_defaults(func=self.install)
def install(self):
if not (port := resolve_port(self.logger, self.args.port)):
return 1
try:
with FlipperStorage(port) as storage:
storage_ops = FlipperStorageOperations(storage)
fap_local_path = self.args.fap_src_path
self.args.fap_dst_dir = self.args.fap_dst_dir.rstrip("/\\")
if not os.path.isfile(fap_local_path):
self.logger.error(
f"Error: source .fap ({fap_local_path}) not found"
)
return 2
fap_dst_path = posixpath.join(
self.args.fap_dst_dir, os.path.basename(fap_local_path)
)
self.logger.info(f'Installing "{fap_local_path}" to {fap_dst_path}')
storage_ops.recursive_send(fap_dst_path, fap_local_path, False)
if not self.args.launch_app:
return 0
storage.send_and_wait_eol(
f'loader open "Applications" {fap_dst_path}\r'
)
if len(result := storage.read.until(storage.CLI_EOL)):
self.logger.error(f"Unexpected response: {result.decode('ascii')}")
return 3
return 0
except Exception as e:
self.logger.error(f"Error: {e}")
# raise
return 4
if __name__ == "__main__":
Main()()

View File

@@ -12,13 +12,13 @@ class FlipperAppType(Enum):
SERVICE = "Service"
SYSTEM = "System"
APP = "App"
PLUGIN = "Plugin"
DEBUG = "Debug"
ARCHIVE = "Archive"
SETTINGS = "Settings"
STARTUP = "StartupHook"
EXTERNAL = "External"
METAPACKAGE = "Package"
PLUGIN = "Plugin"
@dataclass
@@ -69,12 +69,22 @@ class FlipperApplication:
fap_private_libs: List[Library] = field(default_factory=list)
fap_file_assets: Optional[str] = None
# Internally used by fbt
_appmanager: Optional["AppManager"] = None
_appdir: Optional[object] = None
_apppath: Optional[str] = None
_plugins: List["FlipperApplication"] = field(default_factory=list)
def supports_hardware_target(self, target: str):
return target in self.targets or "all" in self.targets
@property
def is_default_deployable(self):
return self.apptype != FlipperAppType.DEBUG and self.fap_category != "Examples"
def __post_init__(self):
if self.apptype == FlipperAppType.PLUGIN:
self.stack_size = 0
class AppManager:
def __init__(self):
@@ -94,6 +104,23 @@ class AppManager:
return app
return None
def _validate_app_params(self, *args, **kw):
apptype = kw.get("apptype")
if apptype == FlipperAppType.PLUGIN:
if kw.get("stack_size"):
raise FlipperManifestException(
f"Plugin {kw.get('appid')} cannot have stack (did you mean FlipperAppType.EXTERNAL?)"
)
if not kw.get("requires"):
raise FlipperManifestException(
f"Plugin {kw.get('appid')} must have 'requires' in manifest"
)
# Harmless - cdefines for external apps are meaningless
# if apptype == FlipperAppType.EXTERNAL and kw.get("cdefines"):
# raise FlipperManifestException(
# f"External app {kw.get('appid')} must not have 'cdefines' in manifest"
# )
def load_manifest(self, app_manifest_path: str, app_dir_node: object):
if not os.path.exists(app_manifest_path):
raise FlipperManifestException(
@@ -105,12 +132,14 @@ class AppManager:
def App(*args, **kw):
nonlocal app_manifests
self._validate_app_params(*args, **kw)
app_manifests.append(
FlipperApplication(
*args,
**kw,
_appdir=app_dir_node,
_apppath=os.path.dirname(app_manifest_path),
_appmanager=self,
),
)
@@ -155,7 +184,6 @@ class AppBuildset:
FlipperAppType.SERVICE,
FlipperAppType.SYSTEM,
FlipperAppType.APP,
FlipperAppType.PLUGIN,
FlipperAppType.DEBUG,
FlipperAppType.ARCHIVE,
FlipperAppType.SETTINGS,
@@ -182,6 +210,7 @@ class AppBuildset:
self._check_conflicts()
self._check_unsatisfied() # unneeded?
self._check_target_match()
self._group_plugins()
self.apps = sorted(
list(map(self.appmgr.get, self.appnames)),
key=lambda app: app.appid,
@@ -260,6 +289,18 @@ class AppBuildset:
f"Apps incompatible with target {self.hw_target}: {', '.join(incompatible)}"
)
def _group_plugins(self):
known_extensions = self.get_apps_of_type(FlipperAppType.PLUGIN, all_known=True)
for extension_app in known_extensions:
for parent_app_id in extension_app.requires:
try:
parent_app = self.appmgr.get(parent_app_id)
parent_app._plugins.append(extension_app)
except FlipperManifestException as e:
self._writer(
f"Module {extension_app.appid} has unknown parent {parent_app_id}"
)
def get_apps_cdefs(self):
cdefs = set()
for app in self.apps:
@@ -301,7 +342,6 @@ class ApplicationsCGenerator:
FlipperAppType.SERVICE: ("FlipperApplication", "FLIPPER_SERVICES"),
FlipperAppType.SYSTEM: ("FlipperApplication", "FLIPPER_SYSTEM_APPS"),
FlipperAppType.APP: ("FlipperApplication", "FLIPPER_APPS"),
FlipperAppType.PLUGIN: ("FlipperApplication", "FLIPPER_PLUGINS"),
FlipperAppType.DEBUG: ("FlipperApplication", "FLIPPER_DEBUG_APPS"),
FlipperAppType.SETTINGS: ("FlipperApplication", "FLIPPER_SETTINGS_APPS"),
FlipperAppType.STARTUP: ("FlipperOnStartHook", "FLIPPER_ON_SYSTEM_START"),

108
scripts/fbt/fapassets.py Normal file
View File

@@ -0,0 +1,108 @@
import os
import hashlib
import struct
from typing import TypedDict
class File(TypedDict):
path: str
size: int
content_path: str
class Dir(TypedDict):
path: str
class FileBundler:
"""
u32 magic
u32 version
u32 dirs_count
u32 files_count
u32 signature_size
u8[] signature
Dirs:
u32 dir_name length
u8[] dir_name
Files:
u32 file_name length
u8[] file_name
u32 file_content_size
u8[] file_content
"""
def __init__(self, directory_path: str):
self.directory_path = directory_path
self.file_list: list[File] = []
self.directory_list: list[Dir] = []
self._gather()
def _gather(self):
for root, dirs, files in os.walk(self.directory_path):
for file_info in files:
file_path = os.path.join(root, file_info)
file_size = os.path.getsize(file_path)
self.file_list.append(
{
"path": os.path.relpath(file_path, self.directory_path),
"size": file_size,
"content_path": file_path,
}
)
for dir_info in dirs:
dir_path = os.path.join(root, dir_info)
# dir_size = sum(
# os.path.getsize(os.path.join(dir_path, f)) for f in os.listdir(dir_path)
# )
self.directory_list.append(
{
"path": os.path.relpath(dir_path, self.directory_path),
}
)
self.file_list.sort(key=lambda f: f["path"])
self.directory_list.sort(key=lambda d: d["path"])
def export(self, target_path: str):
self._md5_hash = hashlib.md5()
with open(target_path, "wb") as f:
# Write header magic and version
f.write(struct.pack("<II", 0x4F4C5A44, 0x01))
# Write dirs count
f.write(struct.pack("<I", len(self.directory_list)))
# Write files count
f.write(struct.pack("<I", len(self.file_list)))
md5_hash_size = len(self._md5_hash.digest())
# write signature size and null signature, we'll fill it in later
f.write(struct.pack("<I", md5_hash_size))
signature_offset = f.tell()
f.write(b"\x00" * md5_hash_size)
self._write_contents(f)
f.seek(signature_offset)
f.write(self._md5_hash.digest())
def _write_contents(self, f):
for dir_info in self.directory_list:
f.write(struct.pack("<I", len(dir_info["path"]) + 1))
f.write(dir_info["path"].encode("ascii") + b"\x00")
self._md5_hash.update(dir_info["path"].encode("ascii") + b"\x00")
# Write files
for file_info in self.file_list:
f.write(struct.pack("<I", len(file_info["path"]) + 1))
f.write(file_info["path"].encode("ascii") + b"\x00")
f.write(struct.pack("<I", file_info["size"]))
self._md5_hash.update(file_info["path"].encode("ascii") + b"\x00")
with open(file_info["content_path"], "rb") as content_file:
content = content_file.read()
f.write(content)
self._md5_hash.update(content)

View File

@@ -1,94 +1,117 @@
from dataclasses import dataclass, field
from typing import Optional, TypedDict
from SCons.Builder import Builder
from SCons.Action import Action
from SCons.Errors import UserError
from SCons.Node import NodeList
import SCons.Warnings
from fbt.elfmanifest import assemble_manifest_data
from fbt.appmanifest import FlipperApplication, FlipperManifestException, FlipperAppType
from fbt.sdk.cache import SdkCache
from fbt.util import extract_abs_dir_path
import itertools
import os
import pathlib
import itertools
import shutil
import struct
import hashlib
from dataclasses import dataclass, field
from typing import Optional, TypedDict
from ansi.color import fg
import SCons.Warnings
from SCons.Action import Action
from SCons.Builder import Builder
from SCons.Errors import UserError
from SCons.Node import NodeList
from SCons.Node.FS import File, Entry
from fbt.appmanifest import FlipperApplication, FlipperAppType, FlipperManifestException
from fbt.elfmanifest import assemble_manifest_data
from fbt.fapassets import FileBundler
from fbt.sdk.cache import SdkCache
from fbt.util import extract_abs_dir_path
@dataclass
class FlipperExternalAppInfo:
app: FlipperApplication
compact: NodeList = field(default_factory=NodeList)
debug: NodeList = field(default_factory=NodeList)
validator: NodeList = field(default_factory=NodeList)
installer: NodeList = field(default_factory=NodeList)
compact: Optional[File] = None
debug: Optional[File] = None
validator: Optional[Entry] = None
# List of tuples (dist_to_sd, path)
dist_entries: list[tuple[bool, str]] = field(default_factory=list)
def BuildAppElf(env, app):
ext_apps_work_dir = env.subst("$EXT_APPS_WORK_DIR")
app_work_dir = os.path.join(ext_apps_work_dir, app.appid)
class AppBuilder:
def __init__(self, env, app):
self.fw_env = env
self.app = app
self.ext_apps_work_dir = env.subst("$EXT_APPS_WORK_DIR")
self.app_work_dir = os.path.join(self.ext_apps_work_dir, self.app.appid)
self.app_alias = f"fap_{self.app.appid}"
self.externally_built_files = []
self.private_libs = []
env.SetDefault(_APP_ICONS=[])
env.VariantDir(app_work_dir, app._appdir, duplicate=False)
def build(self):
self._setup_app_env()
self._build_external_files()
self._compile_assets()
self._build_private_libs()
return self._build_app()
app_env = env.Clone(FAP_SRC_DIR=app._appdir, FAP_WORK_DIR=app_work_dir)
def _setup_app_env(self):
self.app_env = self.fw_env.Clone(
FAP_SRC_DIR=self.app._appdir, FAP_WORK_DIR=self.app_work_dir
)
self.app_env.VariantDir(self.app_work_dir, self.app._appdir, duplicate=False)
app_alias = f"fap_{app.appid}"
def _build_external_files(self):
if not self.app.fap_extbuild:
return
app_artifacts = FlipperExternalAppInfo(app)
externally_built_files = []
if app.fap_extbuild:
for external_file_def in app.fap_extbuild:
externally_built_files.append(external_file_def.path)
app_env.Alias(app_alias, external_file_def.path)
app_env.AlwaysBuild(
app_env.Command(
for external_file_def in self.app.fap_extbuild:
self.externally_built_files.append(external_file_def.path)
self.app_env.Alias(self.app_alias, external_file_def.path)
self.app_env.AlwaysBuild(
self.app_env.Command(
external_file_def.path,
None,
Action(
external_file_def.command,
"" if app_env["VERBOSE"] else "\tEXTCMD\t${TARGET}",
"" if self.app_env["VERBOSE"] else "\tEXTCMD\t${TARGET}",
),
)
)
if app.fap_icon_assets:
fap_icons = app_env.CompileIcons(
app_env.Dir(app_work_dir),
app._appdir.Dir(app.fap_icon_assets),
icon_bundle_name=f"{app.fap_icon_assets_symbol if app.fap_icon_assets_symbol else app.appid }_icons",
def _compile_assets(self):
if not self.app.fap_icon_assets:
return
fap_icons = self.app_env.CompileIcons(
self.app_env.Dir(self.app_work_dir),
self.app._appdir.Dir(self.app.fap_icon_assets),
icon_bundle_name=f"{self.app.fap_icon_assets_symbol if self.app.fap_icon_assets_symbol else self.app.appid }_icons",
)
app_env.Alias("_fap_icons", fap_icons)
env.Append(_APP_ICONS=[fap_icons])
self.app_env.Alias("_fap_icons", fap_icons)
self.fw_env.Append(_APP_ICONS=[fap_icons])
private_libs = []
def _build_private_libs(self):
for lib_def in self.app.fap_private_libs:
self.private_libs.append(self._build_private_lib(lib_def))
for lib_def in app.fap_private_libs:
lib_src_root_path = os.path.join(app_work_dir, "lib", lib_def.name)
app_env.AppendUnique(
def _build_private_lib(self, lib_def):
lib_src_root_path = os.path.join(self.app_work_dir, "lib", lib_def.name)
self.app_env.AppendUnique(
CPPPATH=list(
app_env.Dir(lib_src_root_path).Dir(incpath).srcnode().rfile().abspath
self.app_env.Dir(lib_src_root_path)
.Dir(incpath)
.srcnode()
.rfile()
.abspath
for incpath in lib_def.fap_include_paths
),
)
lib_sources = list(
itertools.chain.from_iterable(
app_env.GlobRecursive(source_type, lib_src_root_path)
self.app_env.GlobRecursive(source_type, lib_src_root_path)
for source_type in lib_def.sources
)
)
if len(lib_sources) == 0:
raise UserError(f"No sources gathered for private library {lib_def}")
private_lib_env = app_env.Clone()
private_lib_env = self.app_env.Clone()
private_lib_env.AppendUnique(
CCFLAGS=[
*lib_def.cflags,
@@ -96,86 +119,117 @@ def BuildAppElf(env, app):
CPPDEFINES=lib_def.cdefines,
CPPPATH=list(
map(
lambda cpath: extract_abs_dir_path(app._appdir.Dir(cpath)),
lambda cpath: extract_abs_dir_path(self.app._appdir.Dir(cpath)),
lib_def.cincludes,
)
),
)
lib = private_lib_env.StaticLibrary(
os.path.join(app_work_dir, lib_def.name),
return private_lib_env.StaticLibrary(
os.path.join(self.app_work_dir, lib_def.name),
lib_sources,
)
private_libs.append(lib)
app_sources = list(
itertools.chain.from_iterable(
app_env.GlobRecursive(
source_type,
app_work_dir,
exclude="lib",
def _build_app(self):
self.app_env.Append(
LIBS=[*self.app.fap_libs, *self.private_libs],
CPPPATH=self.app_env.Dir(self.app_work_dir),
)
app_sources = list(
itertools.chain.from_iterable(
self.app_env.GlobRecursive(
source_type,
self.app_work_dir,
exclude="lib",
)
for source_type in self.app.sources
)
for source_type in app.sources
)
)
app_env.Append(
LIBS=[*app.fap_libs, *private_libs],
CPPPATH=env.Dir(app_work_dir),
)
app_artifacts = FlipperExternalAppInfo(self.app)
app_artifacts.debug = self.app_env.Program(
os.path.join(self.ext_apps_work_dir, f"{self.app.appid}_d"),
app_sources,
APP_ENTRY=self.app.entry_point,
)[0]
app_artifacts.debug = app_env.Program(
os.path.join(ext_apps_work_dir, f"{app.appid}_d"),
app_sources,
APP_ENTRY=app.entry_point,
)
app_artifacts.compact = self.app_env.EmbedAppMetadata(
os.path.join(self.ext_apps_work_dir, self.app.appid),
app_artifacts.debug,
APP=self.app,
)[0]
app_env.Clean(
app_artifacts.debug, [*externally_built_files, app_env.Dir(app_work_dir)]
)
app_artifacts.validator = self.app_env.ValidateAppImports(
app_artifacts.compact
)[0]
app_elf_dump = app_env.ObjDump(app_artifacts.debug)
app_env.Alias(f"{app_alias}_list", app_elf_dump)
if self.app.apptype == FlipperAppType.PLUGIN:
for parent_app_id in self.app.requires:
fal_path = (
f"apps_data/{parent_app_id}/plugins/{app_artifacts.compact.name}"
)
deployable = True
# If it's a plugin for a non-deployable app, don't include it in the resources
if parent_app := self.app._appmanager.get(parent_app_id):
if not parent_app.is_default_deployable:
deployable = False
app_artifacts.dist_entries.append((deployable, fal_path))
else:
fap_path = f"apps/{self.app.fap_category}/{app_artifacts.compact.name}"
app_artifacts.dist_entries.append(
(self.app.is_default_deployable, fap_path)
)
app_artifacts.compact = app_env.EmbedAppMetadata(
os.path.join(ext_apps_work_dir, app.appid),
app_artifacts.debug,
APP=app,
)
self._configure_deps_and_aliases(app_artifacts)
return app_artifacts
manifest_vals = {
k: v
for k, v in vars(app).items()
if not k.startswith(FlipperApplication.PRIVATE_FIELD_PREFIX)
}
def _configure_deps_and_aliases(self, app_artifacts: FlipperExternalAppInfo):
# Extra things to clean up along with the app
self.app_env.Clean(
app_artifacts.debug,
[*self.externally_built_files, self.app_env.Dir(self.app_work_dir)],
)
app_env.Depends(
app_artifacts.compact,
[app_env["SDK_DEFINITION"], app_env.Value(manifest_vals)],
)
# Create listing of the app
app_elf_dump = self.app_env.ObjDump(app_artifacts.debug)
self.app_env.Alias(f"{self.app_alias}_list", app_elf_dump)
# Add dependencies on icon files
if app.fap_icon:
app_env.Depends(
# Extra dependencies for the app - manifest values, icon file
manifest_vals = {
k: v
for k, v in vars(self.app).items()
if not k.startswith(FlipperApplication.PRIVATE_FIELD_PREFIX)
}
self.app_env.Depends(
app_artifacts.compact,
app_env.File(f"{app._apppath}/{app.fap_icon}"),
[self.app_env["SDK_DEFINITION"], self.app_env.Value(manifest_vals)],
)
if self.app.fap_icon:
self.app_env.Depends(
app_artifacts.compact,
self.app_env.File(f"{self.app._apppath}/{self.app.fap_icon}"),
)
# Add dependencies on file assets
if app.fap_file_assets:
app_env.Depends(
app_artifacts.compact,
app_env.GlobRecursive(
"*",
app._appdir.Dir(app.fap_file_assets),
),
)
# Add dependencies on file assets
if self.app.fap_file_assets:
self.app_env.Depends(
app_artifacts.compact,
self.app_env.GlobRecursive(
"*",
self.app._appdir.Dir(self.app.fap_file_assets),
),
)
app_artifacts.validator = app_env.ValidateAppImports(app_artifacts.compact)
app_env.AlwaysBuild(app_artifacts.validator)
app_env.Alias(app_alias, app_artifacts.validator)
# Always run the validator for the app's binary when building the app
self.app_env.AlwaysBuild(app_artifacts.validator)
self.app_env.Alias(self.app_alias, app_artifacts.validator)
env["EXT_APPS"][app.appid] = app_artifacts
def BuildAppElf(env, app):
app_builder = AppBuilder(env, app)
env["EXT_APPS"][app.appid] = app_artifacts = app_builder.build()
return app_artifacts
@@ -184,7 +238,7 @@ def prepare_app_metadata(target, source, env):
if not sdk_cache.is_buildable():
raise UserError(
"SDK version is not finalized, please review changes and re-run operation"
"SDK version is not finalized, please review changes and re-run operation. See AppsOnSDCard.md for more details."
)
app = env["APP"]
@@ -208,7 +262,7 @@ def validate_app_imports(target, source, env):
unresolved_syms = app_syms - sdk_cache.get_valid_names()
if unresolved_syms:
warning_msg = fg.brightyellow(
f"{source[0].path}: app won't run. Unresolved symbols: "
f"{source[0].path}: app may not be runnable. Symbols not resolved using firmware's API: "
) + fg.brightmagenta(f"{unresolved_syms}")
disabled_api_syms = unresolved_syms.intersection(sdk_cache.get_disabled_names())
if disabled_api_syms:
@@ -220,7 +274,7 @@ def validate_app_imports(target, source, env):
SCons.Warnings.warn(SCons.Warnings.LinkWarning, warning_msg),
def GetExtAppFromPath(env, app_dir):
def GetExtAppByIdOrPath(env, app_dir):
if not app_dir:
raise UserError("APPSRC= not set")
@@ -228,10 +282,10 @@ def GetExtAppFromPath(env, app_dir):
app = None
try:
# Maybe used passed an appid?
# Maybe user passed an appid?
app = appmgr.get(app_dir)
except FlipperManifestException as _:
# Look up path components in known app dits
# Look up path components in known app dirs
for dir_part in reversed(pathlib.Path(app_dir).parts):
if app := appmgr.find_by_appdir(dir_part):
break
@@ -242,47 +296,47 @@ def GetExtAppFromPath(env, app_dir):
app_artifacts = env["EXT_APPS"].get(app.appid, None)
if not app_artifacts:
raise UserError(
f"Application {app.appid} is not configured for building as external"
f"Application {app.appid} is not configured to be built as external"
)
return app_artifacts
def resources_fap_dist_emitter(target, source, env):
target_dir = target[0]
# Initially we have a single target - target dir
# Here we inject pairs of (target, source) for each file
resources_root = target[0]
target = []
for _, app_artifacts in env["EXT_APPS"].items():
# We don't deploy example apps & debug tools with SD card resources
if (
app_artifacts.app.apptype == FlipperAppType.DEBUG
or app_artifacts.app.fap_category == "Examples"
for app_artifacts in env["EXT_APPS"].values():
for _, dist_path in filter(
lambda dist_entry: dist_entry[0], app_artifacts.dist_entries
):
continue
source.extend(app_artifacts.compact)
target.append(
target_dir.Dir(app_artifacts.app.fap_category).File(
app_artifacts.compact[0].name
)
)
source.append(app_artifacts.compact)
target.append(resources_root.File(dist_path))
assert len(target) == len(source)
return (target, source)
def resources_fap_dist_action(target, source, env):
# FIXME
target_dir = env.Dir("#/assets/resources/apps")
# FIXME: find a proper way to remove stale files
target_dir = env.Dir("${RESOURCES_ROOT}/apps")
shutil.rmtree(target_dir.path, ignore_errors=True)
# Iterate over pairs generated in emitter
for src, target in zip(source, target):
os.makedirs(os.path.dirname(target.path), exist_ok=True)
shutil.copy(src.path, target.path)
def generate_embed_app_metadata_emitter(target, source, env):
def embed_app_metadata_emitter(target, source, env):
app = env["APP"]
# Hack: change extension for fap libs
if app.apptype == FlipperAppType.PLUGIN:
target[0].name = target[0].name.replace(".fap", ".fal")
meta_file_name = source[0].path + ".meta"
target.append("#" + meta_file_name)
@@ -293,110 +347,14 @@ def generate_embed_app_metadata_emitter(target, source, env):
return (target, source)
class File(TypedDict):
path: str
size: int
content_path: str
class Dir(TypedDict):
path: str
def prepare_app_files(target, source, env):
app = env["APP"]
directory = app._appdir.Dir(app.fap_file_assets)
directory_path = directory.abspath
if not directory.exists():
raise UserError(f"File asset directory {directory} does not exist")
file_list: list[File] = []
directory_list: list[Dir] = []
for root, dirs, files in os.walk(directory_path):
for file_info in files:
file_path = os.path.join(root, file_info)
file_size = os.path.getsize(file_path)
file_list.append(
{
"path": os.path.relpath(file_path, directory_path),
"size": file_size,
"content_path": file_path,
}
)
for dir_info in dirs:
dir_path = os.path.join(root, dir_info)
dir_size = sum(
os.path.getsize(os.path.join(dir_path, f)) for f in os.listdir(dir_path)
)
directory_list.append(
{
"path": os.path.relpath(dir_path, directory_path),
}
)
file_list.sort(key=lambda f: f["path"])
directory_list.sort(key=lambda d: d["path"])
files_section = source[0].path + ".files.section"
with open(files_section, "wb") as f:
# u32 magic
# u32 version
# u32 dirs_count
# u32 files_count
# u32 signature_size
# u8[] signature
# Dirs:
# u32 dir_name length
# u8[] dir_name
# Files:
# u32 file_name length
# u8[] file_name
# u32 file_content_size
# u8[] file_content
# Write header magic and version
f.write(struct.pack("<II", 0x4F4C5A44, 0x01))
# Write dirs count
f.write(struct.pack("<I", len(directory_list)))
# Write files count
f.write(struct.pack("<I", len(file_list)))
md5_hash = hashlib.md5()
md5_hash_size = len(md5_hash.digest())
# write signature size and null signature, we'll fill it in later
f.write(struct.pack("<I", md5_hash_size))
signature_offset = f.tell()
f.write(b"\x00" * md5_hash_size)
# Write dirs
for dir_info in directory_list:
f.write(struct.pack("<I", len(dir_info["path"]) + 1))
f.write(dir_info["path"].encode("ascii") + b"\x00")
md5_hash.update(dir_info["path"].encode("ascii") + b"\x00")
# Write files
for file_info in file_list:
f.write(struct.pack("<I", len(file_info["path"]) + 1))
f.write(file_info["path"].encode("ascii") + b"\x00")
f.write(struct.pack("<I", file_info["size"]))
md5_hash.update(file_info["path"].encode("ascii") + b"\x00")
with open(file_info["content_path"], "rb") as content_file:
content = content_file.read()
f.write(content)
md5_hash.update(content)
# Write signature
f.seek(signature_offset)
f.write(md5_hash.digest())
bundler = FileBundler(directory.abspath)
bundler.export(source[0].path + ".files.section")
def generate_embed_app_metadata_actions(source, target, env, for_signature):
@@ -437,6 +395,7 @@ def generate(env, **kw):
env.SetDefault(
EXT_APPS_WORK_DIR="${FBT_FAP_DEBUG_ELF_ROOT}",
APP_RUN_SCRIPT="${FBT_SCRIPT_DIR}/runfap.py",
STORAGE_SCRIPT="${FBT_SCRIPT_DIR}/storage.py",
)
if not env["VERBOSE"]:
env.SetDefault(
@@ -449,10 +408,12 @@ def generate(env, **kw):
env.SetDefault(
EXT_APPS={}, # appid -> FlipperExternalAppInfo
EXT_LIBS={},
_APP_ICONS=[],
)
env.AddMethod(BuildAppElf)
env.AddMethod(GetExtAppFromPath)
env.AddMethod(GetExtAppByIdOrPath)
env.Append(
BUILDERS={
"FapDist": Builder(
@@ -466,7 +427,7 @@ def generate(env, **kw):
generator=generate_embed_app_metadata_actions,
suffix=".fap",
src_suffix=".elf",
# emitter=generate_embed_app_metadata_emitter,
emitter=embed_app_metadata_emitter,
),
"ValidateAppImports": Builder(
action=[

View File

@@ -220,7 +220,7 @@ def gen_sdk_data(sdk_cache: SdkCache):
def _check_sdk_is_up2date(sdk_cache: SdkCache):
if not sdk_cache.is_buildable():
raise UserError(
"SDK version is not finalized, please review changes and re-run operation"
"SDK version is not finalized, please review changes and re-run operation. See AppsOnSDCard.md for more details"
)

View File

@@ -4,6 +4,9 @@ import serial
import time
import hashlib
import math
import logging
import posixpath
import enum
def timing(func):
@@ -25,12 +28,47 @@ def timing(func):
return wrapper
class StorageErrorCode(enum.Enum):
OK = "OK"
NOT_READY = "filesystem not ready"
EXIST = "file/dir already exist"
NOT_EXIST = "file/dir not exist"
INVALID_PARAMETER = "invalid parameter"
DENIED = "access denied"
INVALID_NAME = "invalid name/path"
INTERNAL = "internal error"
NOT_IMPLEMENTED = "function not implemented"
ALREADY_OPEN = "file is already open"
UNKNOWN = "unknown error"
@property
def is_error(self):
return self != self.OK
@classmethod
def from_value(cls, s: str | bytes):
if isinstance(s, bytes):
s = s.decode("ascii")
for code in cls:
if code.value == s:
return code
return cls.UNKNOWN
class FlipperStorageException(Exception):
def __init__(self, message):
super().__init__(f"Storage error: {message}")
def __init__(self, path: str, error_code: StorageErrorCode):
super().__init__(f"Storage error: path '{path}': {error_code.value}")
class BufferedRead:
def __init__(self, stream):
self.buffer = bytearray()
self.stream = stream
def until(self, eol="\n", cut_eol=True):
def until(self, eol: str = "\n", cut_eol: bool = True):
eol = eol.encode("ascii")
while True:
# search in buffer
@@ -59,9 +97,15 @@ class FlipperStorage:
self.port.timeout = 2
self.port.baudrate = 115200 # Doesn't matter for VCP
self.read = BufferedRead(self.port)
self.last_error = ""
self.chunk_size = chunk_size
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
def start(self):
self.port.open()
self.port.reset_input_buffer()
@@ -71,37 +115,34 @@ class FlipperStorage:
# And read buffer until we get prompt
self.read.until(self.CLI_PROMPT)
def stop(self):
def stop(self) -> None:
self.port.close()
def send(self, line):
def send(self, line: str) -> None:
self.port.write(line.encode("ascii"))
def send_and_wait_eol(self, line):
def send_and_wait_eol(self, line: str):
self.send(line)
return self.read.until(self.CLI_EOL)
def send_and_wait_prompt(self, line):
def send_and_wait_prompt(self, line: str):
self.send(line)
return self.read.until(self.CLI_PROMPT)
def has_error(self, data):
"""Is data has error"""
if data.find(b"Storage error") != -1:
return True
else:
return False
def has_error(self, data: bytes | str) -> bool:
"""Is data an error message"""
return data.find(b"Storage error:") != -1
def get_error(self, data):
def get_error(self, data: bytes) -> StorageErrorCode:
"""Extract error text from data and print it"""
error, error_text = data.decode("ascii").split(": ")
return error_text.strip()
_, error_text = data.decode("ascii").split(": ")
return StorageErrorCode.from_value(error_text.strip())
def list_tree(self, path="/", level=0):
def list_tree(self, path: str = "/", level: int = 0):
"""List files and dirs on Flipper"""
path = path.replace("//", "/")
self.send_and_wait_eol('storage list "' + path + '"\r')
self.send_and_wait_eol(f'storage list "{path}"\r')
data = self.read.until(self.CLI_PROMPT)
lines = data.split(b"\r\n")
@@ -139,7 +180,7 @@ class FlipperStorage:
# Something wrong, pass
pass
def walk(self, path="/"):
def walk(self, path: str = "/"):
dirs = []
nondirs = []
walk_dirs = []
@@ -181,14 +222,15 @@ class FlipperStorage:
# Something wrong, pass
pass
# topdown walk, yield before recursy
# topdown walk, yield before recursing
yield path, dirs, nondirs
for new_path in walk_dirs:
yield from self.walk(new_path)
def send_file(self, filename_from, filename_to):
def send_file(self, filename_from: str, filename_to: str):
"""Send file from local device to Flipper"""
self.remove(filename_to)
if self.exist_file(filename_to):
self.remove(filename_to)
with open(filename_from, "rb") as file:
filesize = os.fstat(file.fileno()).st_size
@@ -203,9 +245,9 @@ class FlipperStorage:
self.send_and_wait_eol(f'storage write_chunk "{filename_to}" {size}\r')
answer = self.read.until(self.CLI_EOL)
if self.has_error(answer):
self.last_error = self.get_error(answer)
last_error = self.get_error(answer)
self.read.until(self.CLI_PROMPT)
return False
raise FlipperStorageException(filename_to, last_error)
self.port.write(filedata)
self.read.until(self.CLI_PROMPT)
@@ -218,9 +260,8 @@ class FlipperStorage:
)
sys.stdout.flush()
print()
return True
def read_file(self, filename):
def read_file(self, filename: str):
"""Receive file from Flipper, and get filedata (bytes)"""
buffer_size = self.chunk_size
self.send_and_wait_eol(
@@ -229,9 +270,10 @@ class FlipperStorage:
answer = self.read.until(self.CLI_EOL)
filedata = bytearray()
if self.has_error(answer):
self.last_error = self.get_error(answer)
last_error = self.get_error(answer)
self.read.until(self.CLI_PROMPT)
return filedata
raise FlipperStorageException(filename, last_error)
# return filedata
size = int(answer.split(b": ")[1])
read_size = 0
@@ -251,121 +293,89 @@ class FlipperStorage:
self.read.until(self.CLI_PROMPT)
return filedata
def receive_file(self, filename_from, filename_to):
def receive_file(self, filename_from: str, filename_to: str):
"""Receive file from Flipper to local storage"""
with open(filename_to, "wb") as file:
data = self.read_file(filename_from)
if not data:
return False
else:
file.write(data)
return True
file.write(data)
def exist(self, path):
"""Is file or dir exist on Flipper"""
self.send_and_wait_eol('storage stat "' + path + '"\r')
answer = self.read.until(self.CLI_EOL)
def exist(self, path: str):
"""Does file or dir exist on Flipper"""
self.send_and_wait_eol(f'storage stat "{path}"\r')
response = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
if self.has_error(answer):
self.last_error = self.get_error(answer)
return False
else:
return True
return not self.has_error(response)
def exist_dir(self, path):
"""Is dir exist on Flipper"""
self.send_and_wait_eol('storage stat "' + path + '"\r')
answer = self.read.until(self.CLI_EOL)
def exist_dir(self, path: str):
"""Does dir exist on Flipper"""
self.send_and_wait_eol(f'storage stat "{path}"\r')
response = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
if self.has_error(response):
error_code = self.get_error(response)
if error_code in (
StorageErrorCode.NOT_EXIST,
StorageErrorCode.INVALID_NAME,
):
return False
raise FlipperStorageException(path, error_code)
return True
def exist_file(self, path: str):
"""Does file exist on Flipper"""
self.send_and_wait_eol(f'storage stat "{path}"\r')
response = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
if self.has_error(answer):
self.last_error = self.get_error(answer)
return False
else:
if answer.find(b"Directory") != -1:
return True
elif answer.find(b"Storage") != -1:
return True
else:
return False
return response.find(b"File, size:") != -1
def exist_file(self, path):
"""Is file exist on Flipper"""
self.send_and_wait_eol('storage stat "' + path + '"\r')
answer = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
def _check_no_error(self, response, path=None):
if self.has_error(response):
raise FlipperStorageException(self.get_error(response))
if self.has_error(answer):
self.last_error = self.get_error(answer)
return False
else:
if answer.find(b"File, size:") != -1:
return True
else:
return False
def size(self, path):
def size(self, path: str):
"""file size on Flipper"""
self.send_and_wait_eol('storage stat "' + path + '"\r')
answer = self.read.until(self.CLI_EOL)
self.send_and_wait_eol(f'storage stat "{path}"\r')
response = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
if self.has_error(answer):
self.last_error = self.get_error(answer)
return False
else:
if answer.find(b"File, size:") != -1:
size = int(
"".join(
ch
for ch in answer.split(b": ")[1].decode("ascii")
if ch.isdigit()
)
self._check_no_error(response, path)
if response.find(b"File, size:") != -1:
size = int(
"".join(
ch
for ch in response.split(b": ")[1].decode("ascii")
if ch.isdigit()
)
return size
else:
self.last_error = "access denied"
return -1
)
return size
raise FlipperStorageException("Not a file")
def mkdir(self, path):
def mkdir(self, path: str):
"""Create a directory on Flipper"""
self.send_and_wait_eol('storage mkdir "' + path + '"\r')
answer = self.read.until(self.CLI_EOL)
self.send_and_wait_eol(f'storage mkdir "{path}"\r')
response = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
if self.has_error(answer):
self.last_error = self.get_error(answer)
return False
else:
return True
self._check_no_error(response, path)
def format_ext(self):
"""Create a directory on Flipper"""
"""Format external storage on Flipper"""
self.send_and_wait_eol("storage format /ext\r")
self.send_and_wait_eol("y\r")
answer = self.read.until(self.CLI_EOL)
response = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
self._check_no_error(response, "/ext")
if self.has_error(answer):
self.last_error = self.get_error(answer)
return False
else:
return True
def remove(self, path):
def remove(self, path: str):
"""Remove file or directory on Flipper"""
self.send_and_wait_eol('storage remove "' + path + '"\r')
answer = self.read.until(self.CLI_EOL)
self.send_and_wait_eol(f'storage remove "{path}"\r')
response = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
self._check_no_error(response, path)
if self.has_error(answer):
self.last_error = self.get_error(answer)
return False
else:
return True
def hash_local(self, filename):
def hash_local(self, filename: str):
"""Hash of local file"""
hash_md5 = hashlib.md5()
with open(filename, "rb") as f:
@@ -373,14 +383,112 @@ class FlipperStorage:
hash_md5.update(chunk)
return hash_md5.hexdigest()
def hash_flipper(self, filename):
def hash_flipper(self, filename: str):
"""Get hash of file on Flipper"""
self.send_and_wait_eol('storage md5 "' + filename + '"\r')
hash = self.read.until(self.CLI_EOL)
self.read.until(self.CLI_PROMPT)
self._check_no_error(hash, filename)
return hash.decode("ascii")
if self.has_error(hash):
self.last_error = self.get_error(hash)
return ""
class FlipperStorageOperations:
def __init__(self, storage):
self.storage: FlipperStorage = storage
self.logger = logging.getLogger("FStorageOps")
def send_file_to_storage(
self, flipper_file_path: str, local_file_path: str, force: bool = False
):
self.logger.debug(
f"* send_file_to_storage: {local_file_path}->{flipper_file_path}, {force=}"
)
exists = self.storage.exist_file(flipper_file_path)
do_upload = not exists
if exists:
hash_local = self.storage.hash_local(local_file_path)
hash_flipper = self.storage.hash_flipper(flipper_file_path)
self.logger.debug(f"hash check: local {hash_local}, flipper {hash_flipper}")
do_upload = force or (hash_local != hash_flipper)
if do_upload:
self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
self.storage.send_file(local_file_path, flipper_file_path)
# make directory with exist check
def mkpath(self, flipper_dir_path: str):
path_components, dirs_to_create = flipper_dir_path.split("/"), []
while not self.storage.exist_dir(dir_path := "/".join(path_components)):
self.logger.debug(f'"{dir_path}" does not exist, will create')
dirs_to_create.append(path_components.pop())
for dir_to_create in reversed(dirs_to_create):
path_components.append(dir_to_create)
self.storage.mkdir("/".join(path_components))
# send file or folder recursively
def recursive_send(self, flipper_path: str, local_path: str, force: bool = False):
if not os.path.exists(local_path):
raise FlipperStorageException(f'"{local_path}" does not exist')
if os.path.isdir(local_path):
# create parent dir
self.mkpath(flipper_path)
for dirpath, dirnames, filenames in os.walk(local_path):
self.logger.debug(f'Processing directory "{os.path.normpath(dirpath)}"')
dirnames.sort()
filenames.sort()
rel_path = os.path.relpath(dirpath, local_path)
# create subdirs
for dirname in dirnames:
flipper_dir_path = os.path.join(flipper_path, rel_path, dirname)
flipper_dir_path = os.path.normpath(flipper_dir_path).replace(
os.sep, "/"
)
self.mkpath(flipper_dir_path)
# send files
for filename in filenames:
flipper_file_path = os.path.join(flipper_path, rel_path, filename)
flipper_file_path = os.path.normpath(flipper_file_path).replace(
os.sep, "/"
)
local_file_path = os.path.normpath(os.path.join(dirpath, filename))
self.send_file_to_storage(flipper_file_path, local_file_path, force)
else:
return hash.decode("ascii")
self.mkpath(posixpath.dirname(flipper_path))
self.send_file_to_storage(flipper_path, local_path, force)
def recursive_receive(self, flipper_path: str, local_path: str):
if self.storage.exist_dir(flipper_path):
for dirpath, dirnames, filenames in self.storage.walk(flipper_path):
self.logger.debug(
f'Processing directory "{os.path.normpath(dirpath)}"'.replace(
os.sep, "/"
)
)
dirnames.sort()
filenames.sort()
rel_path = os.path.relpath(dirpath, flipper_path)
for dirname in dirnames:
local_dir_path = os.path.join(local_path, rel_path, dirname)
local_dir_path = os.path.normpath(local_dir_path)
os.makedirs(local_dir_path, exist_ok=True)
for filename in filenames:
local_file_path = os.path.join(local_path, rel_path, filename)
local_file_path = os.path.normpath(local_file_path)
flipper_file_path = os.path.normpath(
os.path.join(dirpath, filename)
).replace(os.sep, "/")
self.logger.info(
f'Receiving "{flipper_file_path}" to "{local_file_path}"'
)
self.storage.receive_file(flipper_file_path, local_file_path)
else:
self.logger.info(f'Receiving "{flipper_path}" to "{local_path}"')
self.storage.receive_file(flipper_path, local_path)

View File

@@ -1,9 +0,0 @@
ansi==0.3.6
black==22.6.0
colorlog==6.7.0
heatshrink2==0.11.0
Pillow==9.1.1
protobuf==3.20.1
pyserial==3.5
python3-protobuf==2.5.0
SCons==4.4.0

View File

@@ -1,108 +1,86 @@
#!/usr/bin/env python3
import posixpath
from typing import final
from flipper.app import App
from flipper.storage import FlipperStorage
from flipper.storage import FlipperStorage, FlipperStorageOperations
from flipper.utils.cdc import resolve_port
import logging
import os
import pathlib
import serial.tools.list_ports as list_ports
import posixpath
from functools import reduce
import operator
class Main(App):
def init(self):
self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
self.parser.add_argument(
"-n",
"--no-launch",
dest="launch_app",
action="store_false",
help="Don't launch app",
"--sources",
"-s",
nargs="+",
action="append",
default=[],
help="Files to send",
)
self.parser.add_argument(
"--targets",
"-t",
nargs="+",
action="append",
default=[],
help="File destinations (must be same length as -s)",
)
self.parser.add_argument(
"--host-app",
"-a",
help="Host app to launch",
)
self.parser.add_argument("fap_src_path", help="App file to upload")
self.parser.add_argument(
"--fap_dst_dir", help="Upload path", default="/ext/apps", required=False
)
self.parser.set_defaults(func=self.install)
# logging
self.logger = logging.getLogger()
# make directory with exist check
def mkdir_on_storage(self, storage, flipper_dir_path):
if not storage.exist_dir(flipper_dir_path):
self.logger.debug(f'"{flipper_dir_path}" does not exist, creating')
if not storage.mkdir(flipper_dir_path):
self.logger.error(f"Error: {storage.last_error}")
return False
else:
self.logger.debug(f'"{flipper_dir_path}" already exists')
return True
# send file with exist check and hash check
def send_file_to_storage(self, storage, flipper_file_path, local_file_path, force):
exists = storage.exist_file(flipper_file_path)
do_upload = not exists
if exists:
hash_local = storage.hash_local(local_file_path)
hash_flipper = storage.hash_flipper(flipper_file_path)
self.logger.debug(f"hash check: local {hash_local}, flipper {hash_flipper}")
do_upload = force or (hash_local != hash_flipper)
if do_upload:
self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
if not storage.send_file(local_file_path, flipper_file_path):
self.logger.error(f"Error: {storage.last_error}")
return False
return True
@staticmethod
def flatten(l):
return reduce(operator.concat, l, [])
def install(self):
if not (port := resolve_port(self.logger, self.args.port)):
self.args.sources = self.flatten(self.args.sources)
self.args.targets = self.flatten(self.args.targets)
if len(self.args.sources) != len(self.args.targets):
self.logger.error(
f"Error: sources ({self.args.sources}) and targets ({self.args.targets}) must be same length"
)
return 1
storage = FlipperStorage(port)
storage.start()
if not (port := resolve_port(self.logger, self.args.port)):
return 2
try:
fap_local_path = self.args.fap_src_path
self.args.fap_dst_dir = self.args.fap_dst_dir.rstrip("/\\")
with FlipperStorage(port) as storage:
storage_ops = FlipperStorageOperations(storage)
for fap_local_path, fap_dst_path in zip(
self.args.sources, self.args.targets
):
self.logger.info(f'Installing "{fap_local_path}" to {fap_dst_path}')
if not os.path.isfile(fap_local_path):
self.logger.error(f"Error: source .fap ({fap_local_path}) not found")
return -1
storage_ops.recursive_send(fap_dst_path, fap_local_path, False)
fap_dst_path = posixpath.join(
self.args.fap_dst_dir, os.path.basename(fap_local_path)
)
fap_host_app = self.args.targets[0]
startup_command = f'"Applications" {fap_host_app}'
if self.args.host_app:
startup_command = self.args.host_app
self.logger.info(f'Installing "{fap_local_path}" to {fap_dst_path}')
self.logger.info(f"Launching app: {startup_command}")
storage.send_and_wait_eol(f"loader open {startup_command}\r")
if not self.mkdir_on_storage(storage, self.args.fap_dst_dir):
self.logger.error(f"Error: cannot create dir: {storage.last_error}")
return -2
if not self.send_file_to_storage(
storage, fap_dst_path, fap_local_path, False
):
self.logger.error(f"Error: upload failed: {storage.last_error}")
return -3
if self.args.launch_app:
storage.send_and_wait_eol(
f'loader open "Applications" {fap_dst_path}\r'
)
result = storage.read.until(storage.CLI_EOL)
if len(result):
if len(result := storage.read.until(storage.CLI_EOL)):
self.logger.error(f"Unexpected response: {result.decode('ascii')}")
return -4
return 3
return 0
return 0
finally:
storage.stop()
except Exception as e:
self.logger.error(f"Error: {e}")
# raise
return 4
if __name__ == "__main__":

View File

@@ -2,7 +2,7 @@
from typing import final
from flipper.app import App
from flipper.storage import FlipperStorage
from flipper.storage import FlipperStorage, FlipperStorageOperations
from flipper.utils.cdc import resolve_port
import logging
@@ -24,89 +24,47 @@ class Main(App):
# logging
self.logger = logging.getLogger()
# make directory with exist check
def mkdir_on_storage(self, storage, flipper_dir_path):
if not storage.exist_dir(flipper_dir_path):
self.logger.debug(f'"{flipper_dir_path}" does not exist, creating')
if not storage.mkdir(flipper_dir_path):
self.logger.error(f"Error: {storage.last_error}")
return False
else:
self.logger.debug(f'"{flipper_dir_path}" already exists')
return True
# send file with exist check and hash check
def send_file_to_storage(self, storage, flipper_file_path, local_file_path, force):
exists = storage.exist_file(flipper_file_path)
do_upload = not exists
if exists:
hash_local = storage.hash_local(local_file_path)
hash_flipper = storage.hash_flipper(flipper_file_path)
self.logger.debug(f"hash check: local {hash_local}, flipper {hash_flipper}")
do_upload = force or (hash_local != hash_flipper)
if do_upload:
self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
if not storage.send_file(local_file_path, flipper_file_path):
self.logger.error(f"Error: {storage.last_error}")
return False
return True
def install(self):
if not (port := resolve_port(self.logger, self.args.port)):
return 1
storage = FlipperStorage(port)
storage.start()
if not os.path.isfile(self.args.manifest_path):
self.logger.error("Error: manifest not found")
return 2
manifest_path = pathlib.Path(os.path.abspath(self.args.manifest_path))
manifest_name, pkg_name = manifest_path.parts[-1], manifest_path.parts[-2]
pkg_dir_name = self.args.pkg_dir_name or pkg_name
update_root = "/ext/update"
flipper_update_path = f"{update_root}/{pkg_dir_name}"
self.logger.info(f'Installing "{pkg_name}" from {flipper_update_path}')
try:
if not os.path.isfile(self.args.manifest_path):
self.logger.error("Error: manifest not found")
return 2
with FlipperStorage(port) as storage:
storage_ops = FlipperStorageOperations(storage)
storage_ops.mkpath(update_root)
storage_ops.mkpath(flipper_update_path)
storage_ops.recursive_send(
flipper_update_path, manifest_path.parents[0]
)
manifest_path = pathlib.Path(os.path.abspath(self.args.manifest_path))
manifest_name, pkg_name = manifest_path.parts[-1], manifest_path.parts[-2]
pkg_dir_name = self.args.pkg_dir_name or pkg_name
update_root = "/ext/update"
flipper_update_path = f"{update_root}/{pkg_dir_name}"
self.logger.info(f'Installing "{pkg_name}" from {flipper_update_path}')
# if not os.path.exists(self.args.manifest_path):
# self.logger.error("Error: package not found")
if not self.mkdir_on_storage(
storage, update_root
) or not self.mkdir_on_storage(storage, flipper_update_path):
self.logger.error(f"Error: cannot create {storage.last_error}")
return -2
for dirpath, dirnames, filenames in os.walk(manifest_path.parents[0]):
for fname in filenames:
self.logger.debug(f"Uploading {fname}")
local_file_path = os.path.join(dirpath, fname)
flipper_file_path = f"{flipper_update_path}/{fname}"
if not self.send_file_to_storage(
storage, flipper_file_path, local_file_path, False
):
self.logger.error(f"Error: {storage.last_error}")
return -3
# return -11
storage.send_and_wait_eol(
f"update install {flipper_update_path}/{manifest_name}\r"
)
result = storage.read.until(storage.CLI_EOL)
if not b"Verifying" in result:
self.logger.error(f"Unexpected response: {result.decode('ascii')}")
return -4
return 3
result = storage.read.until(storage.CLI_EOL)
if not result.startswith(b"OK"):
self.logger.error(result.decode("ascii"))
return -5
break
return 0
finally:
storage.stop()
return 4
return 0
except Exception as e:
self.logger.error(e)
return 5
if __name__ == "__main__":

View File

@@ -1,16 +1,28 @@
#!/usr/bin/env python3
from flipper.app import App
from flipper.storage import FlipperStorage
from flipper.storage import FlipperStorage, FlipperStorageOperations
from flipper.utils.cdc import resolve_port
import logging
import os
import binascii
import filecmp
import tempfile
def WrapStorageOp(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
return 0
except Exception as e:
print(f"Error: {e}")
# raise # uncomment to debug
return 1
return wrapper
class Main(App):
def init(self):
self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
@@ -71,229 +83,71 @@ class Main(App):
)
self.parser_stress.set_defaults(func=self.stress)
def _get_storage(self):
def _get_port(self):
if not (port := resolve_port(self.logger, self.args.port)):
return None
storage = FlipperStorage(port)
storage.start()
return storage
raise Exception("Failed to resolve port")
return port
@WrapStorageOp
def mkdir(self):
if not (storage := self._get_storage()):
return 1
self.logger.debug(f'Creating "{self.args.flipper_path}"')
if not storage.mkdir(self.args.flipper_path):
self.logger.error(f"Error: {storage.last_error}")
storage.stop()
return 0
with FlipperStorage(self._get_port()) as storage:
storage.mkdir(self.args.flipper_path)
@WrapStorageOp
def remove(self):
if not (storage := self._get_storage()):
return 1
self.logger.debug(f'Removing "{self.args.flipper_path}"')
if not storage.remove(self.args.flipper_path):
self.logger.error(f"Error: {storage.last_error}")
storage.stop()
return 0
with FlipperStorage(self._get_port()) as storage:
storage.remove(self.args.flipper_path)
@WrapStorageOp
def receive(self):
if not (storage := self._get_storage()):
return 1
if storage.exist_dir(self.args.flipper_path):
for dirpath, dirnames, filenames in storage.walk(self.args.flipper_path):
self.logger.debug(
f'Processing directory "{os.path.normpath(dirpath)}"'.replace(
os.sep, "/"
)
)
dirnames.sort()
filenames.sort()
rel_path = os.path.relpath(dirpath, self.args.flipper_path)
for dirname in dirnames:
local_dir_path = os.path.join(
self.args.local_path, rel_path, dirname
)
local_dir_path = os.path.normpath(local_dir_path)
os.makedirs(local_dir_path, exist_ok=True)
for filename in filenames:
local_file_path = os.path.join(
self.args.local_path, rel_path, filename
)
local_file_path = os.path.normpath(local_file_path)
flipper_file_path = os.path.normpath(
os.path.join(dirpath, filename)
).replace(os.sep, "/")
self.logger.info(
f'Receiving "{flipper_file_path}" to "{local_file_path}"'
)
if not storage.receive_file(flipper_file_path, local_file_path):
self.logger.error(f"Error: {storage.last_error}")
else:
self.logger.info(
f'Receiving "{self.args.flipper_path}" to "{self.args.local_path}"'
with FlipperStorage(self._get_port()) as storage:
FlipperStorageOperations(storage).recursive_receive(
self.args.flipper_path, self.args.local_path
)
if not storage.receive_file(self.args.flipper_path, self.args.local_path):
self.logger.error(f"Error: {storage.last_error}")
storage.stop()
return 0
@WrapStorageOp
def send(self):
if not (storage := self._get_storage()):
return 1
self.send_to_storage(
storage, self.args.flipper_path, self.args.local_path, self.args.force
)
storage.stop()
return 0
# send file or folder recursively
def send_to_storage(self, storage, flipper_path, local_path, force):
if not os.path.exists(local_path):
self.logger.error(f'Error: "{local_path}" is not exist')
if os.path.isdir(local_path):
# create parent dir
self.mkdir_on_storage(storage, flipper_path)
for dirpath, dirnames, filenames in os.walk(local_path):
self.logger.debug(f'Processing directory "{os.path.normpath(dirpath)}"')
dirnames.sort()
filenames.sort()
rel_path = os.path.relpath(dirpath, local_path)
# create subdirs
for dirname in dirnames:
flipper_dir_path = os.path.join(flipper_path, rel_path, dirname)
flipper_dir_path = os.path.normpath(flipper_dir_path).replace(
os.sep, "/"
)
self.mkdir_on_storage(storage, flipper_dir_path)
# send files
for filename in filenames:
flipper_file_path = os.path.join(flipper_path, rel_path, filename)
flipper_file_path = os.path.normpath(flipper_file_path).replace(
os.sep, "/"
)
local_file_path = os.path.normpath(os.path.join(dirpath, filename))
self.send_file_to_storage(
storage, flipper_file_path, local_file_path, force
)
else:
self.send_file_to_storage(storage, flipper_path, local_path, force)
# make directory with exist check
def mkdir_on_storage(self, storage, flipper_dir_path):
if not storage.exist_dir(flipper_dir_path):
self.logger.debug(f'"{flipper_dir_path}" does not exist, creating')
if not storage.mkdir(flipper_dir_path):
self.logger.error(f"Error: {storage.last_error}")
else:
self.logger.debug(f'"{flipper_dir_path}" already exists')
# send file with exist check and hash check
def send_file_to_storage(self, storage, flipper_file_path, local_file_path, force):
if not storage.exist_file(flipper_file_path):
self.logger.debug(
f'"{flipper_file_path}" does not exist, sending "{local_file_path}"'
with FlipperStorage(self._get_port()) as storage:
FlipperStorageOperations(storage).recursive_send(
self.args.flipper_path, self.args.local_path, self.args.force
)
self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
if not storage.send_file(local_file_path, flipper_file_path):
self.logger.error(f"Error: {storage.last_error}")
elif force:
self.logger.debug(
f'"{flipper_file_path}" exists, but will be overwritten by "{local_file_path}"'
)
self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
if not storage.send_file(local_file_path, flipper_file_path):
self.logger.error(f"Error: {storage.last_error}")
else:
self.logger.debug(
f'"{flipper_file_path}" exists, compare hash with "{local_file_path}"'
)
hash_local = storage.hash_local(local_file_path)
hash_flipper = storage.hash_flipper(flipper_file_path)
if not hash_flipper:
self.logger.error(f"Error: {storage.last_error}")
if hash_local == hash_flipper:
self.logger.debug(
f'"{flipper_file_path}" is equal to "{local_file_path}"'
)
else:
self.logger.debug(
f'"{flipper_file_path}" is NOT equal to "{local_file_path}"'
)
self.logger.info(
f'Sending "{local_file_path}" to "{flipper_file_path}"'
)
if not storage.send_file(local_file_path, flipper_file_path):
self.logger.error(f"Error: {storage.last_error}")
@WrapStorageOp
def read(self):
if not (storage := self._get_storage()):
return 1
self.logger.debug(f'Reading "{self.args.flipper_path}"')
data = storage.read_file(self.args.flipper_path)
if not data:
self.logger.error(f"Error: {storage.last_error}")
else:
with FlipperStorage(self._get_port()) as storage:
data = storage.read_file(self.args.flipper_path)
try:
print("Text data:")
print(data.decode())
except:
print("Binary hexadecimal data:")
print(binascii.hexlify(data).decode())
storage.stop()
return 0
@WrapStorageOp
def size(self):
if not (storage := self._get_storage()):
return 1
self.logger.debug(f'Getting size of "{self.args.flipper_path}"')
size = storage.size(self.args.flipper_path)
if size < 0:
self.logger.error(f"Error: {storage.last_error}")
else:
print(size)
storage.stop()
return 0
with FlipperStorage(self._get_port()) as storage:
print(storage.size(self.args.flipper_path))
@WrapStorageOp
def list(self):
if not (storage := self._get_storage()):
return 1
self.logger.debug(f'Listing "{self.args.flipper_path}"')
storage.list_tree(self.args.flipper_path)
storage.stop()
return 0
with FlipperStorage(self._get_port()) as storage:
storage.list_tree(self.args.flipper_path)
@WrapStorageOp
def format_ext(self):
if not (storage := self._get_storage()):
return 1
self.logger.debug("Formatting /ext SD card")
with FlipperStorage(self._get_port()) as storage:
storage.format_ext()
if not storage.format_ext():
self.logger.error(f"Error: {storage.last_error}")
storage.stop()
return 0
@WrapStorageOp
def stress(self):
self.logger.error("This test is wearing out flash memory.")
self.logger.error("Never use it with internal storage(/int)")
self.logger.error("Never use it with internal storage (/int)")
if self.args.flipper_path.startswith(
"/int"
@@ -312,24 +166,19 @@ class Main(App):
with open(send_file_name, "w") as fout:
fout.write("A" * self.args.file_size)
storage = self._get_storage()
if not storage:
return 1
if storage.exist_file(self.args.flipper_path):
self.logger.error("File exists, remove it first")
return
while self.args.count > 0:
storage.send_file(send_file_name, self.args.flipper_path)
storage.receive_file(self.args.flipper_path, receive_file_name)
if not filecmp.cmp(receive_file_name, send_file_name):
self.logger.error("Files mismatch")
break
storage.remove(self.args.flipper_path)
os.unlink(receive_file_name)
self.args.count -= 1
storage.stop()
return 0
with FlipperStorage(self._get_port()) as storage:
if storage.exist_file(self.args.flipper_path):
self.logger.error("File exists, remove it first")
return
while self.args.count > 0:
storage.send_file(send_file_name, self.args.flipper_path)
storage.receive_file(self.args.flipper_path, receive_file_name)
if not filecmp.cmp(receive_file_name, send_file_name):
self.logger.error("Files mismatch")
break
storage.remove(self.args.flipper_path)
os.unlink(receive_file_name)
self.args.count -= 1
if __name__ == "__main__":