-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
lxa_iobus: node: add LxaRemoteNode that uses the HTTP API
This allows using lxa_iobus as a library and talking to a node that is connected to an IOBus server as if it was connected locally. This can come in handy for writing command line tools for node types that do not yet have a fully-fledged API. The features of a node are discoverd by enumerating the available protocols via SDO-over-HTTP. Signed-off-by: Leonard Göhrs <l.goehrs@pengutronix.de>
- Loading branch information
Showing
8 changed files
with
171 additions
and
97 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import contextlib | ||
import json | ||
import logging | ||
|
||
from .object_directory import ObjectDirectory | ||
from .products import find_product | ||
|
||
logger = logging.getLogger("lxa_iobus.base_node") | ||
|
||
|
||
class LxaBaseNode(object): | ||
def __init__(self, lss_address): | ||
self.lss_address = lss_address | ||
self.product = find_product(lss_address) | ||
self.name = self.product.name() | ||
self.address = ".".join(["{:08x}".format(i) for i in lss_address]) | ||
|
||
self.locator_state = False | ||
|
||
async def setup_object_directory(self): | ||
self.od = await ObjectDirectory.scan( | ||
self, | ||
self.product.ADC_NAMES, | ||
self.product.INPUT_NAMES, | ||
self.product.OUTPUT_NAMES, | ||
) | ||
|
||
async def ping(self): | ||
try: | ||
if "locator" in self.od: | ||
self.locator_state = await self.od.locator.active() | ||
else: | ||
# The device does not advertise having an IOBus locator. | ||
# Try a CANopen standard endpoint instead | ||
await self.od.manufacturer_device_name.name() | ||
|
||
return True | ||
|
||
except TimeoutError: | ||
return False | ||
|
||
async def set_locator_state(self, state): | ||
if state: | ||
await self.od.locator.enable() | ||
else: | ||
await self.od.locator.disable() | ||
|
||
self.locator_state = state | ||
|
||
async def invoke_isp(self): | ||
# The node will enter the bootloader immediately, | ||
# so we will not receive a response. | ||
with contextlib.suppress(TimeoutError): | ||
await self.od.bootloader.enter() | ||
|
||
async def info(self): | ||
device_name = await self.od.manufacturer_device_name.name() | ||
hardware_version = await self.od.manufacturer_hardware_version.version() | ||
software_version = await self.od.manufacturer_software_version.version() | ||
|
||
# check for updates | ||
update_name = "" | ||
|
||
bundled_firmware_version = self.product.FIRMWARE_VERSION | ||
bundled_firmware_file = self.product.FIRMWARE_FILE | ||
|
||
if (bundled_firmware_version is not None) and (bundled_firmware_file is not None): | ||
raw_version = software_version.split(" ")[1] | ||
version_tuple = tuple([int(i) for i in raw_version.split(".")]) | ||
|
||
if version_tuple < bundled_firmware_version: | ||
update_name = bundled_firmware_file | ||
|
||
info = { | ||
"device_name": device_name, | ||
"address": self.address, | ||
"hardware_version": hardware_version, | ||
"software_version": software_version, | ||
"update_name": update_name, | ||
} | ||
|
||
if "version_info" in self.od: | ||
info["protocol_version"] = await self.od.version_info.protocol() | ||
info["board_version"] = await self.od.version_info.board() | ||
info["serial_string"] = await self.od.version_info.serial() | ||
info["vendor_name"] = await self.od.version_info.vendor_name() | ||
info["notes"] = await self.od.version_info.notes() | ||
|
||
# If the json is not valid we just leave it as string instead | ||
with contextlib.suppress(json.decoder.JSONDecodeError): | ||
info["notes"] = json.loads(info["notes"]) | ||
|
||
return info |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import logging | ||
|
||
from aiohttp import ClientResponseError, ClientSession | ||
|
||
from lxa_iobus.canopen import SdoAbort | ||
|
||
from .base_node import LxaBaseNode | ||
|
||
logger = logging.getLogger("lxa_iobus.remote_node") | ||
|
||
|
||
class LxaRemoteNode(LxaBaseNode): | ||
@classmethod | ||
async def new(cls, base_url, node_name): | ||
session = ClientSession(raise_for_status=True) | ||
|
||
response = await session.get(f"{base_url}/nodes/{node_name}/") | ||
body = await response.json() | ||
address = body["result"]["info"]["address"] | ||
lss_address = list(int(a) for a in address.split(".")) | ||
|
||
this = cls(session, base_url, node_name, lss_address) | ||
|
||
await this.setup_object_directory() | ||
|
||
return this | ||
|
||
def __repr__(self): | ||
return f"<LxaRemoteNode(address={self.address}, base_url={self.base_url})>" | ||
|
||
def __init__(self, session, base_url, node_name, lss_address): | ||
super().__init__(lss_address) | ||
|
||
self.session = session | ||
self.base_url = base_url | ||
self.node_name = node_name | ||
|
||
def _sdo_url(self, index, sub_index): | ||
return f"{self.base_url}/api/v2/node/{self.node_name}/raw_sdo/0x{index:04x}/{sub_index}" | ||
|
||
async def sdo_read(self, index, sub_index, _timeout=None): | ||
try: | ||
response = await self.session.get(self._sdo_url(index, sub_index)) | ||
return await response.read() | ||
except ClientResponseError as e: | ||
logger.warn(f"sdo_read() failed for node {self.name}: {e}") | ||
|
||
# We do not have all the information we need for a proper SdoAbort, | ||
# but no node id and error id "General error" should be good enough. | ||
raise SdoAbort(0, index, sub_index, 0x08000000) from e | ||
|
||
async def sdo_write(self, index, sub_index, data, _timeout=None): | ||
try: | ||
await self.session.post(self._sdo_url(index, sub_index), data=data) | ||
except ClientResponseError as e: | ||
logger.warn(f"sdo_write() failed for node {self.name}: {e}") | ||
|
||
# We do not have all the information we need for a proper SdoAbort, | ||
# but no node id and error id "General error" should be good enough. | ||
raise SdoAbort(0, index, sub_index, 0x08000000) from e | ||
|
||
async def close(self): | ||
await self.session.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters