trs_interface.protocol.decoder module¶
The bulk of the decoder
module contains functions to decode raw byte strings to dictionaries
containing the message type and data fields. The majority of these are automatically generated,
and consequently not documented.
Fortunately, a user of the protocol
should not need to know anything about these methods,
and instead simply rely on the StreamDecoder
to split messages out from a continuous
byte stream (such as serial port data), decode them, and serve the decoded dictionaries.
For example:
from serial import Serial
from trs_interface.protocol.decoder import StreamDecoder
# Open a serial port connection
serial_port = Serial("/dev/ttyACM0", timeout=0.1, write_timeout=0.1)
# Create the decoder for received messages
decoder = StreamDecoder(serial_port, on_error="warn")
# Split out messages waiting on the serial port and return them
for msg in decoder:
print(f"Received message: id={msg.id}={msg.msg}, data={msg.data}, payload={msg.payload}")
Note though that an end user of the trs_interface
should not even need to deal with anything
at the protocol
layer, and instead interact with the device purely through the
TRSInterface
class.
- class trs_interface.protocol.decoder.StreamDecoder(stream=None, on_error='warn', max_message_size=4194304)[source]¶
Bases:
object
Create a StreamDecoder to decode a byte stream into messages to or from the TRSInterface.
The
stream
parameter should be an object which data can be sourced from. It should support theread()
method.The
on_error
parameter selects the action to take if invalid data is detected. If set to"continue"
(the default), bytes will be discarded if the byte sequence does not appear to be a valid message. If set to"warn"
, the behaviour is identical, but a warning message will be emitted. To instead immediately abort the stream decoding and raise aRuntimeError
, set to"raise"
.- Parameters
stream – A data stream from which data can be
read()
from.on_error – Action to take if invalid data is detected.
- trs_interface.protocol.decoder.get_chopper_syncin_polarity(data_raw: bytes) Dict[str, Any] [source]¶
- trs_interface.protocol.decoder.get_chopper_syncout_polarity(data_raw: bytes) Dict[str, Any] [source]¶
- trs_interface.protocol.decoder.got_chopper_syncin_polarity(data_raw: bytes) Dict[str, Any] [source]¶
- trs_interface.protocol.decoder.got_chopper_syncout_polarity(data_raw: bytes) Dict[str, Any] [source]¶
- trs_interface.protocol.decoder.set_chopper_syncin_polarity(data_raw: bytes) Dict[str, Any] [source]¶
- trs_interface.protocol.decoder.set_chopper_syncout_polarity(data_raw: bytes) Dict[str, Any] [source]¶
- trs_interface.protocol.decoder.decoder_for_id = {ID.GET_PING: <function get_ping>, ID.GOT_PING: <function got_ping>, ID.GOT_UNKNOWN_MSG: <function got_unknown_msg>, ID.GET_VERSION: <function get_version>, ID.STORE_SETTINGS: <function store_settings>, ID.GET_LASER_SYNC_POLARITY: <function get_laser_sync_polarity>, ID.GOT_LASER_SYNC_POLARITY: <function got_laser_sync_polarity>, ID.SET_LASER_SYNC_POLARITY: <function set_laser_sync_polarity>, ID.GET_CHOPPER_SYNCIN_POLARITY: <function get_chopper_syncin_polarity>, ID.GOT_CHOPPER_SYNCIN_POLARITY: <function got_chopper_syncin_polarity>, ID.SET_CHOPPER_SYNCIN_POLARITY: <function set_chopper_syncin_polarity>, ID.GET_CHOPPER_SYNCOUT_POLARITY: <function get_chopper_syncout_polarity>, ID.GOT_CHOPPER_SYNCOUT_POLARITY: <function got_chopper_syncout_polarity>, ID.SET_CHOPPER_SYNCOUT_POLARITY: <function set_chopper_syncout_polarity>, ID.GET_DELAY_TRIG_POLARITY: <function get_delay_trig_polarity>, ID.GOT_DELAY_TRIG_POLARITY: <function got_delay_trig_polarity>, ID.SET_DELAY_TRIG_POLARITY: <function set_delay_trig_polarity>, ID.GET_CAMERA_TRIG_POLARITY: <function get_camera_trig_polarity>, ID.GOT_CAMERA_TRIG_POLARITY: <function got_camera_trig_polarity>, ID.SET_CAMERA_TRIG_POLARITY: <function set_camera_trig_polarity>, ID.GET_QUADRATURE_POLARITY: <function get_quadrature_polarity>, ID.GOT_QUADRATURE_POLARITY: <function got_quadrature_polarity>, ID.SET_QUADRATURE_POLARITY: <function set_quadrature_polarity>, ID.GET_QUADRATURE_DIRECTION: <function get_quadrature_direction>, ID.GOT_QUADRATURE_DIRECTION: <function got_quadrature_direction>, ID.SET_QUADRATURE_DIRECTION: <function set_quadrature_direction>, ID.GET_CHOPPER_SYNC_DELAY: <function get_chopper_sync_delay>, ID.GOT_CHOPPER_SYNC_DELAY: <function got_chopper_sync_delay>, ID.SET_CHOPPER_SYNC_DELAY: <function set_chopper_sync_delay>, ID.GET_CHOPPER_SYNC_DURATION: <function get_chopper_sync_duration>, ID.GOT_CHOPPER_SYNC_DURATION: <function got_chopper_sync_duration>, ID.SET_CHOPPER_SYNC_DURATION: <function set_chopper_sync_duration>, ID.GET_CAMERA_SYNC_DELAY: <function get_camera_sync_delay>, ID.GOT_CAMERA_SYNC_DELAY: <function got_camera_sync_delay>, ID.SET_CAMERA_SYNC_DELAY: <function set_camera_sync_delay>, ID.GET_CAMERA_SYNC_DURATION: <function get_camera_sync_duration>, ID.GOT_CAMERA_SYNC_DURATION: <function got_camera_sync_duration>, ID.SET_CAMERA_SYNC_DURATION: <function set_camera_sync_duration>, ID.GET_CHOPPER_DIVIDER: <function get_chopper_divider>, ID.GOT_CHOPPER_DIVIDER: <function got_chopper_divider>, ID.SET_CHOPPER_DIVIDER: <function set_chopper_divider>, ID.GET_QUADRATURE_VALUE: <function get_quadrature_value>, ID.GOT_QUADRATURE_VALUE: <function got_quadrature_value>, ID.SET_QUADRATURE_VALUE: <function set_quadrature_value>, ID.TRIGGER: <function trigger>, ID.ARM: <function arm>, ID.START: <function start>, ID.STOP: <function stop>, ID.GET_LASER_SYNC_PERIOD: <function get_laser_sync_period>, ID.GOT_LASER_SYNC_PERIOD: <function got_laser_sync_period>, ID.GOT_VERSION: <function got_version>, ID.GOT_DATA: <function got_data>}¶
Dictionary for looking up a function which can decode data corresponding to a given message ID value.