blob: bb55bdc06e95b9e4bdb5074b7b8b8329d57858d7 [file] [log] [blame]
############################################################################
# apps/examples/shv-nxboot-updater/update-script/shvflasher.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################
import asyncio
import io
import zlib
from shv import SHVBytes
from shv.rpcapi.valueclient import SHVValueClient
from shv.rpcurl import RpcUrl
async def shv_flasher(
connection: str, name: str, path_to_root: str, queue: asyncio.Queue | None
) -> None:
url = RpcUrl.parse(connection)
client = await SHVValueClient.connect(url)
assert client is not None
node_name = f"{path_to_root}/fwUpdate"
node_name_dotdevice = f"{path_to_root}/.device"
res = await client.call(node_name, "stat")
maxfilesize = res[1]
maxwrite = res[5]
print(f"Received maximum enabled write size {maxwrite}.")
print(f"Received maximum file size is {maxfilesize}")
print(f"Started uploading new firmware {name}... this may take some time.")
size = 0
with open(name, mode="rb") as f:
# first, compute the CRC from the zlib library
# turns out, NuttX uses the same polynomial
if queue:
f.seek(0, io.SEEK_END)
size = f.tell()
print(f"File's size is {size}")
f.seek(0, io.SEEK_SET)
transfers = size / maxwrite
i = 0
crc = 0
while data := f.read(maxwrite):
print("Uploading chunk n.", i + 1)
crc = zlib.crc32(data, crc)
offset = i * maxwrite
res = await client.call(node_name, "write", [offset, SHVBytes(data)])
i += 1
if queue:
currProgress = (int)((i * 100) / transfers)
queue.put_nowait(currProgress)
print("Flashing completed!")
# now get the CRC from the device and reset the device, if OK
res = await client.call(node_name, "crc", [0, size])
# just to be sure, make it unsigned
res = res & 0xFFFFFFFF
# the result of the CRC is signed, actually, so make reinterpret it as unsigned
print(f"Calculated CRC: {hex(crc)} and received: {hex(res)}")
if res == crc:
print("Checksum match, perform reset!")
res = await client.call(node_name_dotdevice, "reset")
else:
print("Checksum mismatch!")
await client.disconnect()