trs_interface.protocol.decoder module

The bulk of the decoder module contains functions to decode raw byte strings to dictionaries containing the message type and data fields. The majority of these are automatically generated, and consequently not documented. Fortunately, a user of the protocol should not need to know anything about these methods, and instead simply rely on the StreamDecoder to split messages out from a continuous byte stream (such as serial port data), decode them, and serve the decoded dictionaries.

For example:

from serial import Serial
from trs_interface.protocol.decoder import StreamDecoder

# Open a serial port connection
serial_port = Serial("/dev/ttyACM0", timeout=0.1, write_timeout=0.1)
# Create the decoder for received messages
decoder = StreamDecoder(serial_port, on_error="warn")

# Split out messages waiting on the serial port and return them
for msg in decoder:
    print(f"Received message: id={msg.id}={msg.msg}, data={msg.data}, payload={msg.payload}")

Note though that an end user of the trs_interface should not even need to deal with anything at the protocol layer, and instead interact with the device purely through the TRSInterface class.

class trs_interface.protocol.decoder.StreamDecoder(stream=None, on_error='warn', max_message_size=4194304)[source]

Bases: object

Create a StreamDecoder to decode a byte stream into messages to or from the TRSInterface.

The stream parameter should be an object which data can be sourced from. It should support the read() method.

The on_error parameter selects the action to take if invalid data is detected. If set to "continue" (the default), bytes will be discarded if the byte sequence does not appear to be a valid message. If set to "warn", the behaviour is identical, but a warning message will be emitted. To instead immediately abort the stream decoding and raise a RuntimeError, set to "raise".

Parameters
  • stream – A data stream from which data can be read() from.

  • on_error – Action to take if invalid data is detected.

feed(data: bytes)[source]

Add byte data to the input stream.

The input stream must support random access (using the seek() method), and thus is not applicable to sources such as serial port input.

Parameters

data – Byte array containing data to add.

trs_interface.protocol.decoder.arm(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_camera_sync_delay(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_camera_sync_duration(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_camera_trig_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_chopper_divider(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_chopper_sync_delay(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_chopper_sync_duration(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_chopper_syncin_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_chopper_syncout_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_delay_trig_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_laser_sync_period(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_laser_sync_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_ping(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_quadrature_direction(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_quadrature_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_quadrature_value(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.get_version(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_camera_sync_delay(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_camera_sync_duration(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_camera_trig_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_chopper_divider(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_chopper_sync_delay(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_chopper_sync_duration(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_chopper_syncin_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_chopper_syncout_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_data(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_delay_trig_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_laser_sync_period(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_laser_sync_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_ping(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_quadrature_direction(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_quadrature_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_quadrature_value(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_unknown_msg(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.got_version(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_camera_sync_delay(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_camera_sync_duration(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_camera_trig_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_chopper_divider(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_chopper_sync_delay(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_chopper_sync_duration(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_chopper_syncin_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_chopper_syncout_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_delay_trig_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_laser_sync_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_quadrature_direction(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_quadrature_polarity(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.set_quadrature_value(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.start(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.stop(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.store_settings(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.trigger(data_raw: bytes) Dict[str, Any][source]
trs_interface.protocol.decoder.decoder_for_id = {ID.GET_PING: <function get_ping>, ID.GOT_PING: <function got_ping>, ID.GOT_UNKNOWN_MSG: <function got_unknown_msg>, ID.GET_VERSION: <function get_version>, ID.STORE_SETTINGS: <function store_settings>, ID.GET_LASER_SYNC_POLARITY: <function get_laser_sync_polarity>, ID.GOT_LASER_SYNC_POLARITY: <function got_laser_sync_polarity>, ID.SET_LASER_SYNC_POLARITY: <function set_laser_sync_polarity>, ID.GET_CHOPPER_SYNCIN_POLARITY: <function get_chopper_syncin_polarity>, ID.GOT_CHOPPER_SYNCIN_POLARITY: <function got_chopper_syncin_polarity>, ID.SET_CHOPPER_SYNCIN_POLARITY: <function set_chopper_syncin_polarity>, ID.GET_CHOPPER_SYNCOUT_POLARITY: <function get_chopper_syncout_polarity>, ID.GOT_CHOPPER_SYNCOUT_POLARITY: <function got_chopper_syncout_polarity>, ID.SET_CHOPPER_SYNCOUT_POLARITY: <function set_chopper_syncout_polarity>, ID.GET_DELAY_TRIG_POLARITY: <function get_delay_trig_polarity>, ID.GOT_DELAY_TRIG_POLARITY: <function got_delay_trig_polarity>, ID.SET_DELAY_TRIG_POLARITY: <function set_delay_trig_polarity>, ID.GET_CAMERA_TRIG_POLARITY: <function get_camera_trig_polarity>, ID.GOT_CAMERA_TRIG_POLARITY: <function got_camera_trig_polarity>, ID.SET_CAMERA_TRIG_POLARITY: <function set_camera_trig_polarity>, ID.GET_QUADRATURE_POLARITY: <function get_quadrature_polarity>, ID.GOT_QUADRATURE_POLARITY: <function got_quadrature_polarity>, ID.SET_QUADRATURE_POLARITY: <function set_quadrature_polarity>, ID.GET_QUADRATURE_DIRECTION: <function get_quadrature_direction>, ID.GOT_QUADRATURE_DIRECTION: <function got_quadrature_direction>, ID.SET_QUADRATURE_DIRECTION: <function set_quadrature_direction>, ID.GET_CHOPPER_SYNC_DELAY: <function get_chopper_sync_delay>, ID.GOT_CHOPPER_SYNC_DELAY: <function got_chopper_sync_delay>, ID.SET_CHOPPER_SYNC_DELAY: <function set_chopper_sync_delay>, ID.GET_CHOPPER_SYNC_DURATION: <function get_chopper_sync_duration>, ID.GOT_CHOPPER_SYNC_DURATION: <function got_chopper_sync_duration>, ID.SET_CHOPPER_SYNC_DURATION: <function set_chopper_sync_duration>, ID.GET_CAMERA_SYNC_DELAY: <function get_camera_sync_delay>, ID.GOT_CAMERA_SYNC_DELAY: <function got_camera_sync_delay>, ID.SET_CAMERA_SYNC_DELAY: <function set_camera_sync_delay>, ID.GET_CAMERA_SYNC_DURATION: <function get_camera_sync_duration>, ID.GOT_CAMERA_SYNC_DURATION: <function got_camera_sync_duration>, ID.SET_CAMERA_SYNC_DURATION: <function set_camera_sync_duration>, ID.GET_CHOPPER_DIVIDER: <function get_chopper_divider>, ID.GOT_CHOPPER_DIVIDER: <function got_chopper_divider>, ID.SET_CHOPPER_DIVIDER: <function set_chopper_divider>, ID.GET_QUADRATURE_VALUE: <function get_quadrature_value>, ID.GOT_QUADRATURE_VALUE: <function got_quadrature_value>, ID.SET_QUADRATURE_VALUE: <function set_quadrature_value>, ID.TRIGGER: <function trigger>, ID.ARM: <function arm>, ID.START: <function start>, ID.STOP: <function stop>, ID.GET_LASER_SYNC_PERIOD: <function get_laser_sync_period>, ID.GOT_LASER_SYNC_PERIOD: <function got_laser_sync_period>, ID.GOT_VERSION: <function got_version>, ID.GOT_DATA: <function got_data>}

Dictionary for looking up a function which can decode data corresponding to a given message ID value.