Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions meshtastic/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
except ImportError as e:
have_test = False

import meshtastic.ota
import meshtastic.util
import meshtastic.serial_interface
import meshtastic.tcp_interface
Expand All @@ -60,7 +61,7 @@
have_powermon = False
powermon_exception = e
meter = None
from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2, mesh_pb2
from meshtastic.protobuf import admin_pb2, channel_pb2, config_pb2, portnums_pb2, mesh_pb2
from meshtastic.version import get_active_version

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -452,6 +453,41 @@ def onConnected(interface):
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).rebootOTA()

if args.ota_update:
closeNow = True
waitForAckNak = True

if not isinstance(interface, meshtastic.tcp_interface.TCPInterface):
meshtastic.util.our_exit(
"Error: OTA update currently requires a TCP connection to the node (use --host)."
)

ota = meshtastic.ota.ESP32WiFiOTA(args.ota_update, interface.hostname)

print(f"Triggering OTA update on {interface.hostname}...")
interface.getNode(args.dest, False, **getNode_kwargs).startOTA(
ota_mode=admin_pb2.OTAMode.OTA_WIFI,
ota_file_hash=ota.hash_bytes()
)

print("Waiting for device to reboot into OTA mode...")
time.sleep(5)

retries = 5
while retries > 0:
try:
ota.update()
break

except Exception as e:
retries -= 1
if retries == 0:
meshtastic.util.our_exit(f"\nOTA update failed: {e}")

time.sleep(2)

print("\nOTA update completed successfully!")

if args.enter_dfu:
closeNow = True
waitForAckNak = True
Expand Down Expand Up @@ -1904,10 +1940,18 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars

group.add_argument(
"--reboot-ota",
help="Tell the destination node to reboot into factory firmware (ESP32)",
help="Tell the destination node to reboot into factory firmware (ESP32, firmware version <2.7.18)",
action="store_true",
)

group.add_argument(
"--ota-update",
help="Perform an OTA update on the local node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). "
"Specify the path to the firmware file.",
metavar="FIRMWARE_FILE",
action="store",
)

group.add_argument(
"--enter-dfu",
help="Tell the destination node to enter DFU mode (NRF52)",
Expand Down
18 changes: 17 additions & 1 deletion meshtastic/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def commitSettingsTransaction(self):
return self._sendAdmin(p, onResponse=onResponse)

def rebootOTA(self, secs: int = 10):
"""Tell the node to reboot into factory firmware."""
"""Tell the node to reboot into factory firmware (firmware < 2.7.18)."""
self.ensureSessionKey()
p = admin_pb2.AdminMessage()
p.reboot_ota_seconds = secs
Expand All @@ -667,6 +667,22 @@ def rebootOTA(self, secs: int = 10):
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)

def startOTA(
self,
ota_mode: admin_pb2.OTAMode.ValueType,
ota_file_hash: bytes,
):
"""Tell the node to start OTA mode (firmware >= 2.7.18)."""
if self != self.iface.localNode:
raise ValueError("startOTA only possible in local node")

self.ensureSessionKey()
p = admin_pb2.AdminMessage()
p.ota_request.reboot_ota_mode = ota_mode
p.ota_request.ota_hash = ota_file_hash

return self._sendAdmin(p)

def enterDFUMode(self):
"""Tell the node to enter DFU mode (NRF52)."""
self.ensureSessionKey()
Expand Down
128 changes: 128 additions & 0 deletions meshtastic/ota.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Meshtastic ESP32 Unified OTA
"""
import os
import hashlib
import socket
import logging
from typing import Optional, Callable


logger = logging.getLogger(__name__)


def _file_sha256(filename: str):
"""Calculate SHA256 hash of a file."""
sha256_hash = hashlib.sha256()

with open(filename, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)

return sha256_hash


class OTAError(Exception):
"""Exception for OTA errors."""


class ESP32WiFiOTA:
"""ESP32 WiFi Unified OTA updates."""

def __init__(self, filename: str, hostname: str, port: int = 3232):
self._filename = filename
self._hostname = hostname
self._port = port
self._socket: Optional[socket.socket] = None

if not os.path.exists(self._filename):
raise FileNotFoundError(f"File {self._filename} does not exist")

self._file_hash = _file_sha256(self._filename)

def _read_line(self) -> str:
"""Read a line from the socket."""
if not self._socket:
raise ConnectionError("Socket not connected")

line = b""
while not line.endswith(b"\n"):
char = self._socket.recv(1)

if not char:
raise ConnectionError("Connection closed while waiting for response")

line += char

return line.decode("utf-8").strip()

def hash_bytes(self) -> bytes:
"""Return the hash as bytes."""
return self._file_hash.digest()

def hash_hex(self) -> str:
"""Return the hash as a hex string."""
return self._file_hash.hexdigest()

def update(self, progress_callback: Optional[Callable[[int, int], None]] = None):
"""Perform the OTA update."""
with open(self._filename, "rb") as f:
data = f.read()
size = len(data)

logger.info(f"Starting OTA update with {self._filename} ({size} bytes, hash {self.hash_hex()})")

self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(15)
try:
self._socket.connect((self._hostname, self._port))
logger.debug(f"Connected to {self._hostname}:{self._port}")

# Send start command
self._socket.sendall(f"OTA {size} {self.hash_hex()}\n".encode("utf-8"))

# Wait for OK from the device
while True:
response = self._read_line()
if response == "OK":
break

if response == "ERASING":
logger.info("Device is erasing flash...")
elif response.startswith("ERR "):
raise OTAError(f"Device reported error: {response}")
else:
logger.warning(f"Unexpected response: {response}")

# Stream firmware
sent_bytes = 0
chunk_size = 1024
while sent_bytes < size:
chunk = data[sent_bytes : sent_bytes + chunk_size]
self._socket.sendall(chunk)
sent_bytes += len(chunk)

if progress_callback:
progress_callback(sent_bytes, size)
else:
print(f"[{sent_bytes / size * 100:5.1f}%] Sent {sent_bytes} of {size} bytes...", end="\r")

if not progress_callback:
print()

# Wait for OK from device
logger.info("Firmware sent, waiting for verification...")
while True:
response = self._read_line()
if response == "OK":
logger.info("OTA update completed successfully!")
break

if response.startswith("ERR "):
raise OTAError(f"OTA update failed: {response}")
elif response != "ACK":
logger.warning(f"Unexpected final response: {response}")

finally:
if self._socket:
self._socket.close()
self._socket = None
66 changes: 66 additions & 0 deletions meshtastic/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import platform
import re
import sys
import tempfile
from unittest.mock import mock_open, MagicMock, patch

import pytest
Expand Down Expand Up @@ -2900,3 +2901,68 @@ def test_main_set_ham_empty_string(capsys):
out, _ = capsys.readouterr()
assert "ERROR: Ham radio callsign cannot be empty or contain only whitespace characters" in out
assert excinfo.value.code == 1


# OTA-related tests
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_ota_update_file_not_found(capsys):
"""Test --ota-update with non-existent file"""
sys.argv = [
"",
"--ota-update",
"/nonexistent/firmware.bin",
"--host",
"192.168.1.100",
]
mt_config.args = sys.argv

with pytest.raises(SystemExit) as pytest_wrapped_e:
main()

assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("meshtastic.ota.ESP32WiFiOTA")
@patch("meshtastic.__main__.meshtastic.util.our_exit")
def test_main_ota_update_retries(mock_our_exit, mock_ota_class, capsys):
"""Test --ota-update retries on failure"""
# Create a temporary firmware file
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"fake firmware data")
firmware_file = f.name

try:
sys.argv = ["", "--ota-update", firmware_file, "--host", "192.168.1.100"]
mt_config.args = sys.argv

# Mock the OTA class to fail all 5 retries
mock_ota = MagicMock()
mock_ota_class.return_value = mock_ota
mock_ota.hash_bytes.return_value = b"\x00" * 32
mock_ota.hash_hex.return_value = "a" * 64
mock_ota.update.side_effect = Exception("Connection failed")

# Mock isinstance to return True
with patch("meshtastic.__main__.isinstance", return_value=True):
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp:
mock_iface = MagicMock()
mock_iface.hostname = "192.168.1.100"
mock_iface.localNode = MagicMock(autospec=Node)
mock_tcp.return_value = mock_iface

with patch("time.sleep"):
main()

# Should have exhausted all retries and called our_exit
# Note: our_exit might be called twice - once for TCP check, once for failure
assert mock_our_exit.call_count >= 1
# Check the last call was for OTA failure
last_call_args = mock_our_exit.call_args[0][0]
assert "OTA update failed" in last_call_args

finally:
os.unlink(firmware_file)
35 changes: 35 additions & 0 deletions meshtastic/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,41 @@ def test_setOwner_valid_names(caplog):
assert re.search(r'p.set_owner.short_name:VN:', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_start_ota_local_node():
"""Test startOTA on local node"""
iface = MagicMock(autospec=MeshInterface)
anode = Node(iface, 1234567890, noProto=True)
# Set up as local node
iface.localNode = anode

amesg = admin_pb2.AdminMessage()
with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
with patch.object(anode, "_sendAdmin") as mock_send_admin:
test_hash = b"\x01\x02\x03" * 8 # 24 bytes hash
anode.startOTA(ota_mode=admin_pb2.OTAMode.OTA_WIFI, ota_file_hash=test_hash)

# Verify the OTA request was set correctly
assert amesg.ota_request.reboot_ota_mode == admin_pb2.OTAMode.OTA_WIFI
assert amesg.ota_request.ota_hash == test_hash
mock_send_admin.assert_called_once_with(amesg)


@pytest.mark.unit
def test_start_ota_remote_node_raises_error():
"""Test startOTA on remote node raises ValueError"""
iface = MagicMock(autospec=MeshInterface)
local_node = Node(iface, 1234567890, noProto=True)
remote_node = Node(iface, 9876543210, noProto=True)
iface.localNode = local_node

test_hash = b"\x01\x02\x03" * 8
with pytest.raises(ValueError, match="startOTA only possible in local node"):
remote_node.startOTA(
ota_mode=admin_pb2.OTAMode.OTA_WIFI, ota_file_hash=test_hash
)


# TODO
# @pytest.mark.unitslow
# def test_waitForConfig():
Expand Down
Loading