Source code for tests.test_canadapter

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
test_canadapter
----------------------------------

Tests for the canadapter


"""

import collections
import io
import json
import logging
import os.path
import os
import signal
import subprocess
import sys
import time
import unittest

assert sys.version_info >= (3, 3, 0), "Python version 3.3 or later required!"
import unittest.mock

THIS_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
PARENT_DIRECTORY = os.path.dirname(THIS_DIRECTORY)
SOURCE_DIRECTORY = os.path.join(PARENT_DIRECTORY, 'scripts/')
sys.path.append(SOURCE_DIRECTORY)

import can4python as can

import canadapter
import canadapterlib

VIRTUAL_CAN_BUS_NAME = "vcan0"
NONEXISTING_CAN_BUS_NAME = "can25"

MQTT_TOPICS_TO_DELETE = [
    "commandavailable/climateservice/aircondition",
    "dataavailable/climateservice/vehiclespeed",
    "dataavailable/climateservice/enginespeed",
    "dataavailable/climateservice/actualindoortemperature",
    "dataavailable/climateservice/aircondition",
    "resourceavailable/climateservice/presence",
    "commandavailable/canadapter/no_interesting",
    "commandavailable/canadapter/ADAS_Pos2",
    "commandavailable/canadapter/aircondition",
    "commandavailable/canadapter/ADAS_Posn",
    "commandavailable/canadapter/ADAS_Seg",
    "dataavailable/canadapter/ADAS_Seg",
    "dataavailable/canadapter/ADAS_Pos2",
    "dataavailable/canadapter/ADAS_Posn",
    "dataavailable/canadapter/ADAS_Posn_1",
    "dataavailable/canadapter/ADAS_Posn_2",
    "dataavailable/canadapter/ADAS_Posn_MsgType",
    "dataavailable/canadapter/ADAS_Posn_Offset",
    "dataavailable/canadapter/ADAS_ProfShort_CtrlPoint",
    "dataavailable/canadapter/vehiclespeed",
    "dataavailable/canadapter/no_interesting",
    "dataavailable/canadapter/enginespeed",
    "dataavailable/canadapter/actualindoortemperature",
    "dataavailable/canadapter/indoortemperature",
    "dataavailable/canadapter/aircondition",
    "dataavailable/canadapter/acstatus",
    "resourceavailable/canadapter/presence",
    ]


[docs]class TestConverter(unittest.TestCase):
[docs] def test_converter_individual(self): can_config = can.FilehandlerKcd.read('examples/configfilesForCanadapter/climateservice_cansignals.kcd', None) can_config.ego_node_ids = ["1"] print(can_config.get_descriptive_ascii_art()) converter = canadapterlib.Converter(can_config, 'examples/configfilesForCanadapter/climateservice_mqttsignals.json') print(converter.get_descriptive_ascii_art()) # REGISTER INCOMING COMMANDS commands = converter.get_definitions_incoming_mqtt_command() print("INCOMING MQTT COMMAND DEFINITIONS", commands) self.assertEqual(commands[0]['signalname'], 'aircondition') # REGISTER OUTGOING DATA data = converter.get_definitions_outgoing_mqtt_data() data.sort(key=lambda x: x['signalname']) print("OUTGOING MQTT DATA DEFINITIONS", data) self.assertEqual(data[0]['signalname'], 'actualindoortemperature') self.assertEqual(data[1]['signalname'], 'enginespeed') self.assertEqual(data[2]['signalname'], 'vehiclespeed') print("TO CAN", converter.mqttname_to_translationinfo) print("TO MQTT", converter.canframeid_to_translationinfos) # TO CAN signals = converter.mqtt_to_cansignals('aircondition', '0') print("RESULTING CAN", signals) self.assertIn('acstatus', signals) self.assertAlmostEqual(signals['acstatus'], 0) signals = converter.mqtt_to_cansignals('aircondition', '1') print("RESULTING CAN", signals) self.assertIn('acstatus', signals) self.assertAlmostEqual(signals['acstatus'], 1) # TO MQTT frame = can.canframe.CanFrame(9, b'\x03\x31\x00\x00\x00\x00\x00\x00') messages = converter.canframe_to_mqtt(frame) print("RESULTING MQTT", messages) self.assertIn(messages[0][0], 'actualindoortemperature') self.assertAlmostEqual(messages[0][1], 31.7) frame = can.canframe.CanFrame(8, b'\x14\xAD\x1E\x10\x00\x00\x00\x00') messages = converter.canframe_to_mqtt(frame) print("RESULTING MQTT", messages) messages.sort(key=lambda x: x[0]) self.assertIn(messages[0][0], 'enginespeed') self.assertAlmostEqual(messages[0][1], 1924) self.assertIn(messages[1][0], 'vehiclespeed') self.assertAlmostEqual(messages[1][1], 52.93)
[docs] def test_converter_aggregate(self): logging.basicConfig(level=logging.DEBUG) can_config = can.FilehandlerKcd.read('examples/configfilesForCanadapter/ADASIS_cansignals.kcd', None) can_config.ego_node_ids = ["1"] print(can_config.get_descriptive_ascii_art()) converter = canadapterlib.Converter(can_config, 'examples/configfilesForCanadapter/ADASIS_mqttsignals.json') print(converter.get_descriptive_ascii_art()) # REGISTER INCOMING COMMANDS commands = converter.get_definitions_incoming_mqtt_command() print("INCOMING MQTT COMMAND DEFINITIONS", commands) self.assertEqual(commands[0]['signalname'], 'ADAS_Seg') # REGISTER OUTGOING DATA data = converter.get_definitions_outgoing_mqtt_data() data.sort(key=lambda x: x['signalname']) print("OUTGOING MQTT DATA DEFINITIONS", data) self.assertEqual(data[0]['signalname'], 'ADAS_Posn_1') self.assertEqual(data[1]['signalname'], 'ADAS_Posn_2') self.assertEqual(data[2]['signalname'], 'ADAS_Posn_MsgType') self.assertEqual(data[3]['signalname'], 'ADAS_Posn_Offset') self.assertEqual(data[4]['signalname'], 'ADAS_ProfShort_CtrlPoint') # TO CAN print("\nSENDING MQTT ...") signals = converter.mqtt_to_cansignals('ADAS_Seg', '{"values": {"ADAS_Seg_MsgType": 1.0, ' '"ADAS_Seg_Offset": 2.0, ' '"ADAS_Seg_CycCnt": 3.0, ' '"ADAS_Seg_EffSpdLmt": 4.0, ' '"ADAS_Seg_EffSpdLmtType": 5.0}}') print("RESULTING CAN SIGNALS", signals) self.assertAlmostEqual(signals['ADAS_Seg_MsgType'], 1.0) self.assertAlmostEqual(signals['ADAS_Seg_Offset'], 2.0) self.assertAlmostEqual(signals['ADAS_Seg_CycCnt'], 3.0) self.assertAlmostEqual(signals['ADAS_Seg_EffSpdLmt'], 4.0) self.assertAlmostEqual(signals['ADAS_Seg_EffSpdLmtType'], 5.0) # TO MQTT frame = can.canframe.CanFrame(0x100, b'\x03\x31\x00\x00\x00\x00\x00\x00') print("\nSENDING CAN FRAME: {}".format(frame)) messages = converter.canframe_to_mqtt(frame) messages.sort(key=lambda x: x[0]) print("RESULTING MQTT MESSAGES", messages) self.assertEqual(messages[0][0], 'ADAS_Posn_1') j = json.loads(messages[0][1]) self.assertEqual(j['values']['ADAS_Posn_CycCnt'], 0) self.assertEqual(j['values']['ADAS_Posn_MsgType'], 0) self.assertAlmostEqual(j['values']['ADAS_Posn_Spd'], 0.0) self.assertAlmostEqual(j['values']['Position_offset'], 817.0) self.assertEqual(messages[1][0], 'ADAS_Posn_2') j = json.loads(messages[1][1]) self.assertAlmostEqual(j['values']['ADAS_Posn_PosProbb'], 0.0) self.assertAlmostEqual(j['values']['ADAS_Posn_CurLane'], 0.0) self.assertEqual(messages[2][0], 'ADAS_Posn_MsgType') self.assertEqual(messages[2][1], 0) self.assertEqual(messages[3][0], 'ADAS_Posn_Offset') self.assertAlmostEqual(messages[3][1], 817.0) frame = can.canframe.CanFrame(0x100, b'\x00\x00\x00\x00\x00\x00\x00\x00') print("\nSENDING CAN FRAME: {}".format(frame)) messages = converter.canframe_to_mqtt(frame) messages.sort(key=lambda x: x[0]) print("RESULTING MQTT MESSAGES", messages) self.assertEqual(messages[0][0], 'ADAS_Posn_1') j = json.loads(messages[0][1]) self.assertEqual(j['values']['ADAS_Posn_CycCnt'], 0) self.assertEqual(j['values']['ADAS_Posn_MsgType'], 0) self.assertAlmostEqual(j['values']['ADAS_Posn_Spd'], 0.0) self.assertAlmostEqual(j['values']['Position_offset'], 0.0) frame = can.canframe.CanFrame(0x100, b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF') print("\nSENDING CAN FRAME: {}".format(frame)) messages = converter.canframe_to_mqtt(frame) messages.sort(key=lambda x: x[0]) print("RESULTING MQTT MESSAGES", messages) self.assertEqual(messages[0][0], 'ADAS_Posn_1') j = json.loads(messages[0][1]) self.assertEqual(j['values']['ADAS_Posn_CycCnt'], 3) self.assertEqual(j['values']['ADAS_Posn_MsgType'], 7) self.assertAlmostEqual(j['values']['ADAS_Posn_Spd'], 511.0) self.assertAlmostEqual(j['values']['Position_offset'], 8191.0) frame = can.canframe.CanFrame(0x104, b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF') print("\nSENDING CAN FRAME: {}".format(frame)) messages = converter.canframe_to_mqtt(frame) messages.sort(key=lambda x: x[0]) print("RESULTING MQTT MESSAGES", messages) self.assertEqual(messages[0][0], 'ADAS_ProfShort_CtrlPoint') self.assertEqual(messages[0][1], 1)
[docs]class TestCanAdapter(unittest.TestCase): OUTPUT_FILE_CANDUMPER = 'temporary-candump.txt' OUTPUT_FILE_SUBSCRIBER = 'temporary-sub.txt'
[docs] def setUp(self): self.environment = os.environ.copy() self.environment["COVERAGE_PROCESS_START"] = os.path.join(THIS_DIRECTORY, "coveragerc")
[docs] def tearDown(self): # Remove temporary files try: os.remove(self.OUTPUT_FILE_CANDUMPER) except FileNotFoundError: pass try: os.remove(self.OUTPUT_FILE_SUBSCRIBER) except FileNotFoundError: pass # Delete persistent MQTT messages for topic in MQTT_TOPICS_TO_DELETE: pub = subprocess.Popen(['mosquitto_pub', '-t', topic, '-r', '-n']) time.sleep(0.2) pub.terminate()
[docs] def testConstructor(self): with unittest.mock.patch('sys.argv', ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-listentoallcan']): resource = canadapter.init_canadapter() time.sleep(0.5) resource.stop()
[docs] def testConstructorVerbose(self): with unittest.mock.patch('sys.argv', ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-listentoallcan', '-v']): resource = canadapter.init_canadapter() time.sleep(0.5) resource.stop()
[docs] def testConstructorVerbose2(self): with unittest.mock.patch('sys.argv', ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-listentoallcan', '-vv']): resource = canadapter.init_canadapter() time.sleep(0.5) resource.stop()
[docs] def testConstructorJSON(self): with unittest.mock.patch('sys.argv', ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-mqttfile', 'examples/configfilesForCanadapter/climateservice_mqttsignals.json']): resource = canadapter.init_canadapter() time.sleep(0.5) resource.stop()
[docs] def testConstructorWrongArguments(self): wrong_arguments = [['scriptname', '-j'], ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-listentoallcan', '-k', '-10'], ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-listentoallcan', '-t', '-10'], ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-listentoallcan', '-t', '100000'], ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd'], ['scriptname', '-mode', 'commandline'], ] for arguments in wrong_arguments: with unittest.mock.patch('sys.argv', arguments): with self.assertRaises(SystemExit) as context_manager: canadapter.init_canadapter() self.assertEqual(context_manager.exception.code, 2) # 'Incorrect usage'
[docs] def testConstructorWrongCanInterface(self): with unittest.mock.patch('sys.argv', ['scriptname', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-listentoallcan', '-i', NONEXISTING_CAN_BUS_NAME]): with self.assertRaises(can.CanException) as context_manager: canadapter.init_canadapter()
[docs] def testHelpText(self): original_stdout = sys.stdout try: temporary_stdout = io.StringIO() # Redirect stdout sys.stdout = temporary_stdout with unittest.mock.patch('sys.argv', ['scriptname', '-h']): with self.assertRaises(SystemExit) as context_manager: canadapter.init_canadapter() self.assertEqual(context_manager.exception.code, 0) # 'OK exit' result = temporary_stdout.getvalue() finally: sys.stdout = original_stdout self.assertIn("usage:", result) self.assertIn("CAN interface name. Defaults to ", result)
[docs] def testLoopIndividualsignals(self): with open(self.OUTPUT_FILE_CANDUMPER, 'w') as candumper_outputfile, \ open(self.OUTPUT_FILE_SUBSCRIBER, 'w') as subscriber_outputfile: candumper = subprocess.Popen(['candump', VIRTUAL_CAN_BUS_NAME], stdout=candumper_outputfile, stderr=subprocess.STDOUT) subcriber = subprocess.Popen(['mosquitto_sub', '-v', '-t', '+/climateservice/+'], stdout=subscriber_outputfile, stderr=subprocess.STDOUT) canadapter_process = subprocess.Popen(['python3', 'scripts/canadapter', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-mqttfile', 'examples/configfilesForCanadapter/climateservice_mqttsignals.json', '-mqttname', 'climateservice', '-vv' ], env=self.environment) time.sleep(5) # Send MQTT commands to the canadapter pub1 = subprocess.Popen(['mosquitto_pub', '-t', 'command/climateservice/aircondition', '-m', '1']) time.sleep(1) pub1.terminate() pub2 = subprocess.Popen(['mosquitto_pub', '-t', 'command/climateservice/aircondition', '-m', '0']) time.sleep(3) pub2.terminate() # Send CAN frames to the canadapter pub1 = subprocess.Popen(['cansend', VIRTUAL_CAN_BUS_NAME, '009#0331000000000000']) time.sleep(1) pub1.terminate() pub2 = subprocess.Popen(['cansend', VIRTUAL_CAN_BUS_NAME, '008#14AD1E1000000000']) time.sleep(1) pub2.terminate() # Send wrong CAN frames to the canadapter pub3 = subprocess.Popen(['cansend', VIRTUAL_CAN_BUS_NAME, '009#03']) time.sleep(1) pub3.terminate() # Terminate, and flush files time.sleep(3) canadapter_process.send_signal(signal.SIGINT) time.sleep(1) canadapter_process.terminate() time.sleep(1) canadapter_process.kill() time.sleep(3) # Wait for last will to be sent candumper.kill() subcriber.kill() time.sleep(0.2) candumper_outputfile.flush() os.fsync(candumper_outputfile.fileno()) subscriber_outputfile.flush() os.fsync(subscriber_outputfile.fileno()) # Verify that the canadapter has sent proper MQTT messages with open(self.OUTPUT_FILE_SUBSCRIBER, 'r') as subscriber_outputfile: text = ' '.join(subscriber_outputfile.readlines()) self.assertIn("resourceavailable/climateservice/presence True", text) self.assertIn("dataavailable/climateservice/vehiclespeed True", text) self.assertIn("dataavailable/climateservice/enginespeed True", text) self.assertIn("dataavailable/climateservice/actualindoortemperature True", text) self.assertIn("dataavailable/climateservice/aircondition True", text) self.assertIn("commandavailable/climateservice/aircondition True", text) self.assertIn("data/climateservice/aircondition 1", text) self.assertIn("data/climateservice/aircondition 0", text) self.assertIn("data/climateservice/actualindoortemperature 31", text) self.assertIn("data/climateservice/enginespeed 1924", text) self.assertIn("data/climateservice/vehiclespeed 52", text) self.assertIn("resourceavailable/climateservice/presence False", text) # Verify that the canadapter has sent proper CAN frames with open(self.OUTPUT_FILE_CANDUMPER, 'r') as candumper_outputfile: text = ' '.join(candumper_outputfile.readlines()) self.assertIn("007 [8] 80 00 00 00 00 00 00 00", text) self.assertIn("007 [8] 00 00 00 00 00 00 00 00", text)
[docs] def testLoopThrottleCanFrames(self): NUMBER_OF_INCOMING_CAN_FRAMES = 50 THROTTLING_TIME = 1000 # milliseconds CAN_FRAME_INTERVAL = 0.2 # seconds ERROR_MARGIN = 3 # number of sent MQTT messages assert THROTTLING_TIME > CAN_FRAME_INTERVAL * 1000 with open(self.OUTPUT_FILE_CANDUMPER, 'w') as candumper_outputfile, \ open(self.OUTPUT_FILE_SUBSCRIBER, 'w') as subscriber_outputfile: candumper = subprocess.Popen(['candump', VIRTUAL_CAN_BUS_NAME], stdout=candumper_outputfile, stderr=subprocess.STDOUT) subcriber = subprocess.Popen(['mosquitto_sub', '-v', '-t', '+/climateservice/+'], stdout=subscriber_outputfile, stderr=subprocess.STDOUT) canadapter_process = subprocess.Popen(['python3', 'scripts/canadapter', 'examples/configfilesForCanadapter/climateservice_cansignals.kcd', '-mqttfile', 'examples/configfilesForCanadapter/climateservice_mqttsignals.json', '-mqttname', 'climateservice', '-vv', '-t', str(THROTTLING_TIME) ], env=self.environment) time.sleep(5) # Send CAN frames to the canadapter for i in range(NUMBER_OF_INCOMING_CAN_FRAMES): pub1 = subprocess.Popen(['cansend', VIRTUAL_CAN_BUS_NAME, '008#14AD1E1000000000']) time.sleep(CAN_FRAME_INTERVAL) pub1.terminate() # Terminate, and flush files time.sleep(3) canadapter_process.send_signal(signal.SIGINT) time.sleep(1) canadapter_process.terminate() time.sleep(1) canadapter_process.kill() time.sleep(3) # Wait for last will to be sent candumper.kill() subcriber.kill() time.sleep(0.2) candumper_outputfile.flush() os.fsync(candumper_outputfile.fileno()) subscriber_outputfile.flush() os.fsync(subscriber_outputfile.fileno()) # Verify that the total number of CAN frames with open(self.OUTPUT_FILE_CANDUMPER, 'r') as candumper_outputfile: can_lines = candumper_outputfile.readlines() number_of_can_frames = 0 for line in can_lines: if line.startswith(" "+VIRTUAL_CAN_BUS_NAME+" 008 [8] "): number_of_can_frames += 1 self.assertEqual(number_of_can_frames, NUMBER_OF_INCOMING_CAN_FRAMES) # Calculate approximate number of sent MQTT messages can_send_time = NUMBER_OF_INCOMING_CAN_FRAMES * CAN_FRAME_INTERVAL # seconds approx_number_of_vehiclespeed_messages = can_send_time * 1000 / THROTTLING_TIME # Verify that the canadapter has sent correct number of MQTT messages (throttling) with open(self.OUTPUT_FILE_SUBSCRIBER, 'r') as subscriber_outputfile: mqtt_lines = subscriber_outputfile.readlines() number_of_vehiclespeed_mqtt_messages = 0 for line in mqtt_lines: if line.startswith("data/climateservice/vehiclespeed"): number_of_vehiclespeed_mqtt_messages += 1 self.assertLessEqual(number_of_vehiclespeed_mqtt_messages, approx_number_of_vehiclespeed_messages + ERROR_MARGIN) self.assertGreaterEqual(number_of_vehiclespeed_mqtt_messages, approx_number_of_vehiclespeed_messages - ERROR_MARGIN)
[docs] def testLoopAggregates(self): with open(self.OUTPUT_FILE_CANDUMPER, 'w') as candumper_outputfile, \ open(self.OUTPUT_FILE_SUBSCRIBER, 'w') as subscriber_outputfile: candumper = subprocess.Popen(['candump', VIRTUAL_CAN_BUS_NAME], stdout=candumper_outputfile, stderr=subprocess.STDOUT) subcriber = subprocess.Popen(['mosquitto_sub', '-v', '-t', '+/canadapter/+'], stdout=subscriber_outputfile, stderr=subprocess.STDOUT) canadapter_process = subprocess.Popen(['python3', 'scripts/canadapter', 'examples/configfilesForCanadapter/ADASIS_cansignals.kcd', '-mqttfile', 'examples/configfilesForCanadapter/ADASIS_mqttsignals.json', '-mqttname', 'canadapter', '-vv'], env=self.environment) time.sleep(3) # Send MQTT commands to the canadapter pub1 = subprocess.Popen(['mosquitto_pub', '-t', 'command/canadapter/ADAS_Seg', '-m', '{"values": {"ADAS_Seg_MsgType": 1.0, ' '"ADAS_Seg_Offset": 2.0, ' '"ADAS_Seg_CycCnt": 3.0, ' '"ADAS_Seg_EffSpdLmt": 4.0, ' '"ADAS_Seg_EffSpdLmtType": 5.0}}']) time.sleep(3) pub1.terminate() # Send CAN frames to the canadapter pub1 = subprocess.Popen(['cansend', VIRTUAL_CAN_BUS_NAME, '100#0331000000000000']) time.sleep(1) pub1.terminate() pub2 = subprocess.Popen(['cansend', VIRTUAL_CAN_BUS_NAME, '104#FFFFFFFFFFFFFFFF']) time.sleep(1) pub2.terminate() pub3 = subprocess.Popen(['cansend', VIRTUAL_CAN_BUS_NAME, '100#FFFFFFFFFFFFFFFF']) time.sleep(1) pub3.terminate() # Terminate, and flush files time.sleep(3) canadapter_process.send_signal(signal.SIGINT) time.sleep(1) canadapter_process.terminate() time.sleep(1) canadapter_process.kill() time.sleep(3) # Wait for the last will to be sent candumper.kill() subcriber.kill() time.sleep(0.2) candumper_outputfile.flush() os.fsync(candumper_outputfile.fileno()) subscriber_outputfile.flush() os.fsync(subscriber_outputfile.fileno()) # Verify that the canadapter has sent proper CAN frames with open(self.OUTPUT_FILE_CANDUMPER, 'r') as candumper_outputfile: text = ' '.join(candumper_outputfile.readlines()) print("CAN:\n", text) self.assertIn("101 [8] 20 02 C0 00 00 00 25 00", text) # Verify that the canadapter has sent proper MQTT messages with open(self.OUTPUT_FILE_SUBSCRIBER, 'r') as subscriber_outputfile: text = ' '.join(subscriber_outputfile.readlines()) rows = text.splitlines() messages = collections.defaultdict(list) print("MQTT:") for row in rows: topic, payload = row.strip().split(' ', maxsplit=1) print("T {:42} P {}".format(topic, payload)) messages[topic].append(payload) self.assertEqual(messages['resourceavailable/canadapter/presence'], ['True', 'False']) self.assertEqual(messages['dataavailable/canadapter/ADAS_Posn_1'][0], 'True') self.assertEqual(messages['dataavailable/canadapter/ADAS_Posn_2'][0], 'True') self.assertEqual(messages['dataavailable/canadapter/ADAS_Posn_Offset'][0], 'True') self.assertEqual(messages['dataavailable/canadapter/ADAS_Posn_MsgType'][0], 'True') self.assertEqual(messages['dataavailable/canadapter/ADAS_ProfShort_CtrlPoint'][0], 'True') self.assertEqual(messages['commandavailable/canadapter/ADAS_Seg'][0], 'True') self.assertEqual(int(messages['data/canadapter/ADAS_Posn_MsgType'][0]), 0) self.assertEqual(int(messages['data/canadapter/ADAS_Posn_MsgType'][1]), 7) self.assertAlmostEqual(float(messages['data/canadapter/ADAS_Posn_Offset'][0]), 817.0) self.assertAlmostEqual(float(messages['data/canadapter/ADAS_Posn_Offset'][1]), 8191.0) self.assertEqual(int(messages['data/canadapter/ADAS_ProfShort_CtrlPoint'][0]), 1) j = json.loads(messages['data/canadapter/ADAS_Posn_1'][0]) self.assertEqual(j["values"]["ADAS_Posn_CycCnt"], 0) self.assertAlmostEqual(j["values"]["ADAS_Posn_Spd"], 0.0) self.assertAlmostEqual(j["values"]["Position_offset"], 817.0) self.assertEqual(j["values"]["ADAS_Posn_MsgType"], 0) j = json.loads(messages['data/canadapter/ADAS_Posn_1'][1]) self.assertEqual(j["values"]["ADAS_Posn_CycCnt"], 3) self.assertAlmostEqual(j["values"]["ADAS_Posn_Spd"], 511.0) self.assertAlmostEqual(j["values"]["Position_offset"], 8191.0) self.assertEqual(j["values"]["ADAS_Posn_MsgType"], 7) j = json.loads(messages['data/canadapter/ADAS_Posn_2'][0]) self.assertAlmostEqual(j["values"]["ADAS_Posn_PosProbb"], 0.0) self.assertAlmostEqual(j["values"]["ADAS_Posn_CurLane"], 0.0) j = json.loads(messages['data/canadapter/ADAS_Posn_2'][1]) self.assertAlmostEqual(j["values"]["ADAS_Posn_PosProbb"], 31.0) self.assertAlmostEqual(j["values"]["ADAS_Posn_CurLane"], 7.0)
if __name__ == '__main__': # NOTE: Run this test from parent directory, for the relative filepaths to match # Run all tests # unittest.main(verbosity=2) # Run a single test # # suite = unittest.TestSuite() # suite.addTest(TestCanAdapter("testLoopAggregates")) # suite.addTest(TestCanAdapter("testLoopThrottleCanFrames")) # suite.addTest(TestConverter("test_converter_individual")) # suite.addTest(TestConverter("test_converter_aggregate")) # unittest.TextTestRunner(verbosity=2).run(suite)