# The tester module.
import queue
import time
from collections import UserDict
from typing import Dict, List, Mapping, Optional
import can
from .errors import Error
[docs]class DecodedMessage:
"""A decoded message.
"""
def __init__(self, name, signals):
self.name = name
self.signals = signals
class Messages(UserDict):
def __setitem__(self, message_name, value):
if getattr(self, '_frozen', False):
if message_name not in self.data:
raise KeyError(message_name)
self.data[message_name] = value
def __missing__(self, key):
raise Error(f"invalid message name '{key}'")
def _invert_signal_tree(
tree :List,
cur_mpx: Optional[Dict] = None,
ret: Optional[Dict] = None
) -> Dict:
"""The tree is laid out with two kinds of dicts. Single-element dict
keyed by string -> multiplexer, which is own dict keyed by
integers.
"""
if ret is None:
ret = {}
if cur_mpx is None:
cur_mpx = {}
for sigs in tree:
if isinstance(sigs, Mapping): # outer signal keyed by muliplexer name
(mpx_name, mpx_vals), = sigs.items()
for mpx_val, sig_tree in mpx_vals.items(): # inner signal
# keyed by multiplexer values
next_mpx = cur_mpx.copy()
next_mpx[mpx_name] = mpx_val
_invert_signal_tree(sig_tree, next_mpx, ret)
elif isinstance(sigs, str):
ret.setdefault(sigs,[]).append(set(cur_mpx.items()))
else:
raise TypeError(repr(sigs))
return ret
def invert_signal_tree(tree: List) -> Dict:
"""Return a mapping of signals to the multiplex settings that will
yield the signal.
{signal: [{mplexers}, {mplexers...}]}
"""
return _invert_signal_tree(tree)
class Listener(can.Listener):
def __init__(self, database, messages, input_queue, on_message):
self._database = database
self._messages = messages
self._input_queue = input_queue
self._on_message = on_message
def on_message_received(self, msg):
if msg.is_error_frame or msg.is_remote_frame:
return
try:
database_message = self._database.get_message_by_frame_id(
msg.arbitration_id)
except KeyError:
return
if database_message.name not in self._messages:
return
message = self._messages[database_message.name]
if not message.enabled:
return
decoded = DecodedMessage(database_message.name,
database_message.decode(msg.data,
message.decode_choices,
message.scaling))
if self._on_message:
self._on_message(decoded)
self._input_queue.put(decoded)
class Message(UserDict):
def __init__(self,
database,
can_bus,
input_list,
input_queue,
decode_choices,
scaling,
padding):
super().__init__()
self.database = database
self._mplex_map = invert_signal_tree(database.signal_tree)
self._can_bus = can_bus
self._input_queue = input_queue
self.decode_choices = decode_choices
self.scaling = scaling
self.padding = padding
self._input_list = input_list
self.enabled = True
self._can_message = None
self._periodic_task = None
self._signal_names = {s.name for s in self.database.signals}
self.update(self._prepare_initial_signal_values())
@property
def periodic(self):
return self.database.cycle_time is not None
def __getitem__(self, signal_name):
return self.data[signal_name]
def __setitem__(self, signal_name, value):
if signal_name not in self._signal_names:
raise KeyError(signal_name)
self.data[signal_name] = value
self._update_can_message()
def update(self, signals):
s = dict(signals)
new_signal_names = set(s) - self._signal_names
if new_signal_names:
raise KeyError(repr(new_signal_names))
self.data.update(s)
self._update_can_message()
def send(self, signals=None):
if signals is not None:
self.update(signals)
self._can_bus.send(self._can_message)
def expect(self, signals=None, timeout=None, discard_other_messages=True):
if signals is None:
signals = {}
decoded = self._expect_input_list(signals, discard_other_messages)
if decoded is None:
decoded = self._expect_input_queue(signals,
timeout,
discard_other_messages)
return decoded
def _expect_input_list(self, signals, discard_other_messages):
other_messages = []
while len(self._input_list) > 0:
message = self._input_list.pop(0)
decoded = self._filter_expected_message(message, signals)
if decoded is not None:
break
other_messages.append(message)
else:
decoded = None
if not discard_other_messages:
other_messages += self._input_list
del self._input_list[:]
self._input_list.extend(other_messages)
return decoded
def _expect_input_queue(self, signals, timeout, discard_other_messages):
if timeout is not None:
end_time = time.time() + timeout
remaining_time = timeout
else:
remaining_time = None
while True:
try:
message = self._input_queue.get(timeout=remaining_time)
except queue.Empty:
return
decoded = self._filter_expected_message(message, signals)
if decoded is not None:
return decoded
if not discard_other_messages:
self._input_list.append(message)
if timeout is not None:
remaining_time = end_time - time.time()
if remaining_time <= 0:
return
def _filter_expected_message(self, message, signals):
if message.name == self.database.name:
if all(message.signals[name] == signals[name] for name in signals):
return message.signals
def send_periodic_start(self):
if not self.enabled:
return
self._periodic_task = self._can_bus.send_periodic(
self._can_message,
self.database.cycle_time / 1000.0)
def send_periodic_stop(self):
if self._periodic_task is not None:
self._periodic_task.stop()
self._periodic_task = None
def _update_can_message(self):
arbitration_id = self.database.frame_id
extended_id = self.database.is_extended_frame
pruned_data = self.database.gather_signals(self.data)
data = self.database.encode(pruned_data,
self.scaling,
self.padding)
self._can_message = can.Message(arbitration_id=arbitration_id,
is_extended_id=extended_id,
data=data)
if self._periodic_task is not None:
self._periodic_task.modify_data(self._can_message)
def _prepare_initial_signal_values(self):
initial_sig_values = {}
# Choose a valid set of mux settings
mplex_settings = {}
for m0 in reversed(self._mplex_map.values()):
for m1 in m0:
mplex_settings.update(m1)
for signal in self.database.signals:
minimum = 0 if not signal.minimum else signal.minimum
maximum = 0 if not signal.maximum else signal.maximum
if signal.initial:
# use initial signal value (if set)
initial_sig_values[signal.name] = signal.initial
elif signal.is_multiplexer:
initial_sig_values[signal.name] = mplex_settings.get(signal.name, 0)
elif minimum <= 0 <= maximum:
# use 0 if in allowed range
initial_sig_values[signal.name] = 0
else:
# set at least some default value
initial_sig_values[signal.name] = minimum
return initial_sig_values
[docs]class Tester:
"""Test given node `dut_name` on given CAN bus `bus_name`.
`database` is a :class:`~cantools.database.can.Database` instance.
`can_bus` a CAN bus object, normally created using the python-can
package.
The `on_message` callback is called for every successfully decoded
received message. It is called with one argument, an
:class:`~cantools.tester.DecodedMessage` instance.
Here is an example of how to create a tester:
>>> import can
>>> import cantools
>>> can.rc['interface'] = 'socketcan'
>>> can.rc['channel'] = 'vcan0'
>>> can_bus = can.interface.Bus()
>>> database = cantools.database.load_file('tests/files/tester.kcd')
>>> tester = cantools.tester.Tester('PeriodicConsumer', database, can_bus, 'PeriodicBus')
"""
def __init__(self,
dut_name,
database,
can_bus,
bus_name=None,
on_message=None,
decode_choices=True,
scaling=True,
padding=False):
self._dut_name = dut_name
self._bus_name = bus_name
self._database = database
self._can_bus = can_bus
self._input_list = []
self._input_queue = queue.Queue()
self._messages = Messages()
self._is_running = False
# DUT name validation.
node_names = [node.name for node in database.nodes]
if dut_name and not any(name == dut_name for name in node_names):
raise Error(f"expected DUT name in {node_names}, but got '{dut_name}'")
# BUS name validation.
bus_names = [bus.name for bus in database.buses]
if len(bus_names) == 0:
if bus_name is not None:
raise Error(
f"expected bus name None as there are no buses defined in "
f"the database, but got '{bus_name}'")
elif not any(name == bus_name for name in bus_names):
raise Error(f"expected bus name in {bus_names}, but got '{bus_name}'")
for message in database.messages:
if message.bus_name == bus_name:
self._messages[message.name] = Message(message,
can_bus,
self._input_list,
self._input_queue,
decode_choices,
scaling,
padding)
listener = Listener(self._database,
self._messages,
self._input_queue,
on_message)
self._notifier = can.Notifier(can_bus, [listener])
self._messages._frozen = True
[docs] def start(self):
"""Start the tester. Starts sending enabled periodic messages.
>>> tester.start()
"""
for message in self._messages.values():
if self._dut_name and self._dut_name in message.database.senders:
continue
if not message.periodic:
continue
message.send_periodic_start()
self._is_running = True
[docs] def stop(self):
"""Stop the tester. Periodic messages will not be sent after this
call. Call :meth:`~cantools.tester.Tester.start()` to resume a
stopped tester.
>>> tester.stop()
"""
for message in self._messages.values():
message.send_periodic_stop()
self._is_running = False
@property
def messages(self):
"""Set and get signals in messages. Set signals takes effect
immediately for started enabled periodic messages. Call
:meth:`~cantools.tester.Tester.send()` for other messages.
>>> periodic_message = tester.messages['PeriodicMessage1']
>>> periodic_message
{'Signal1': 0, 'Signal2': 0}
>>> periodic_message['Signal1'] = 1
>>> periodic_message.update({'Signal1': 2, 'Signal2': 5})
>>> periodic_message
{'Signal1': 2, 'Signal2': 5}
"""
return self._messages
[docs] def enable(self, message_name):
"""Enable given message `message_name` and start sending it if its
periodic and the tester is running.
>>> tester.enable('PeriodicMessage1')
"""
message = self._messages[message_name]
message.enabled = True
if self._is_running and message.periodic:
message.send_periodic_start()
[docs] def disable(self, message_name):
"""Disable given message `message_name` and stop sending it if its
periodic, enabled and the tester is running.
>>> tester.disable('PeriodicMessage1')
"""
message = self._messages[message_name]
message.enabled = False
if self._is_running and message.periodic:
message.send_periodic_stop()
[docs] def send(self, message_name, signals=None):
"""Send given message `message_name` and optional signals `signals`.
>>> tester.send('Message1', {'Signal2': 10})
>>> tester.send('Message1')
"""
self._messages[message_name].send(signals)
[docs] def expect(self,
message_name,
signals=None,
timeout=None,
discard_other_messages=True):
"""Expect given message `message_name` and signal values `signals`
within `timeout` seconds.
Give `signals` as ``None`` to expect any signal values.
Give `timeout` as ``None`` to wait forever.
Messages are read from the input queue, and those not matching
given `message_name` and `signals` are discarded if
`discard_other_messages` is
``True``. :meth:`~cantools.tester.Tester.flush_input()` may be
called to discard all old messages in the input queue before
calling the expect function.
Returns the expected message, or ``None`` on timeout.
>>> tester.expect('Message2', {'Signal1': 13})
{'Signal1': 13, 'Signal2': 9}
"""
return self._messages[message_name].expect(signals,
timeout,
discard_other_messages)