Source code for tests.test_minimal_taxiapp

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
test_minimal_taxiapp
----------------------------------

Tests for the minimal app example


"""
import os.path
import os
import signal
import subprocess
import sys
import time
import unittest

assert sys.version_info >= (3, 3, 0), "Python version 3.3 or later required!"

THIS_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
MQTT_TOPICS_TO_DELETE = [
                         "data/taxisignservice/state",
                         ]


[docs]class TestMinimalTaxiapp(unittest.TestCase): OUTPUT_FILE_APP = 'temporary-app.txt' OUTPUT_FILE_SUBSCRIBER = 'temporary-sub.txt'
[docs] def setUp(self): self.environment = os.environ.copy() self.environment["COVERAGE_PROCESS_START"] = os.path.join(THIS_DIRECTORY, "coveragerc")
[docs] def tearDown(self): # Remove temporary files os.remove(self.OUTPUT_FILE_APP) os.remove(self.OUTPUT_FILE_SUBSCRIBER) # Delete persistent MQTT messages for topic in MQTT_TOPICS_TO_DELETE: pub = subprocess.Popen(['mosquitto_pub', '-t', topic, '-r', '-n']) time.sleep(0.2) pub.terminate()
[docs] def testMinimalTaxiapp(self): with open(self.OUTPUT_FILE_SUBSCRIBER, 'w') as subscriber_outputfile, \ open(self.OUTPUT_FILE_APP, 'w') as app_outputfile: subcriber = subprocess.Popen(['mosquitto_sub', '-v', '-t', 'command/taxisignservice/#'], stdout=subscriber_outputfile, stderr=subprocess.STDOUT) app = subprocess.Popen([sys.executable, 'examples/minimal/minimaltaxiapp_with_signalhandler.py'], stdout=app_outputfile, stderr=subprocess.STDOUT, env=self.environment) time.sleep(5) # Verify that the app reacts to taxi sign state changes pub1 = subprocess.Popen(['mosquitto_pub', '-t', 'data/taxisignservice/state', '-m', 'False']) time.sleep(3) pub2 = subprocess.Popen(['mosquitto_pub', '-t', 'data/taxisignservice/state', '-m', 'True']) time.sleep(1) pub1.terminate() pub2.terminate() # Terminate app and flush files time.sleep(4) app.send_signal(signal.SIGINT) time.sleep(1) app.terminate() subcriber.kill() time.sleep(0.1) app_outputfile.flush() os.fsync(app_outputfile.fileno()) subscriber_outputfile.flush() os.fsync(subscriber_outputfile.fileno()) # Verify that the app has sent a command to the taxi sign with open(self.OUTPUT_FILE_SUBSCRIBER, 'r') as subscriber_outputfile: lines = subscriber_outputfile.readlines() self.assertEqual(len(lines), 1) self.assertIn("command/taxisignservice/state True", lines[0]) # Verify that the app has reacted to incoming taxi sign state changes with open(self.OUTPUT_FILE_APP, 'r') as app_outputfile: text = ' '.join(app_outputfile.readlines()) self.assertIn("The taxi sign is now off.", text) self.assertIn("The taxi sign is now on.", text)
if __name__ == '__main__': # Run all tests # unittest.main(verbosity=2) # Run a single test # # suite = unittest.TestSuite() # suite.addTest(TestCanBus("testReceiveNoData")) # unittest.TextTestRunner(verbosity=2).run(suite)