Source code for tests.test_servicemanager

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
test_servicemanager
----------------------------------

Tests for the servicemanager


"""

import os.path
import os
import signal
import subprocess
import sys
import time
import unittest

assert sys.version_info >= (3, 3, 0), "Python version 3.3 or later required!"

THIS_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
PARENT_DIRECTORY = os.path.dirname(THIS_DIRECTORY)
SOURCE_DIRECTORY = os.path.join(PARENT_DIRECTORY, 'scripts/')
sys.path.append(SOURCE_DIRECTORY)

MQTT_TOPICS_TO_DELETE = [
                         'dataavailable/testresource2/testdata1',
                         'dataavailable/testresource2/testdata2',
                         'dataavailable/testresource2/testdata3',
                         'commandavailable/testresource2/testcommand1',
                         'commandavailable/testresource2/testcommand2',
                         'commandavailable/testresource2/testcommand3',
                         'resourceavailable/testresource2/presence',
                         ]


[docs]class TestServicemanager(unittest.TestCase): OUTPUT_FILE_SUBSCRIBER = 'temporary-sub.txt'
[docs] def setUp(self): self.environment = os.environ.copy() self.environment["COVERAGE_PROCESS_START"] = os.path.join(THIS_DIRECTORY, "coveragerc")
[docs] def tearDown(self): # Delete temporary filess try: os.remove(self.OUTPUT_FILE_SUBSCRIBER) except FileNotFoundError: pass # Delete persistent MQTT messages for topic in MQTT_TOPICS_TO_DELETE: pub = subprocess.Popen(['mosquitto_pub', '-t', topic, '-r', '-n']) time.sleep(0.2) pub.terminate()
[docs] def testServicemanager(self): with open(self.OUTPUT_FILE_SUBSCRIBER, 'w') as subscriber_outputfile: subcriber = subprocess.Popen(['mosquitto_sub', '-v', '-t', '+/#'], stdout=subscriber_outputfile, stderr=subprocess.STDOUT) servicemanager = subprocess.Popen(['python3', 'scripts/servicemanager'], stderr=subprocess.STDOUT, env=self.environment) time.sleep(3) # Simulate a starting resource (so the servicemanager can store the data) mqtt_message_list = [('resourceavailable/testresource2/presence', 'True'), ('dataavailable/testresource2/testdata1', 'True'), ('dataavailable/testresource2/testdata2', 'True'), ('dataavailable/testresource2/testdata3', 'True'), ('commandavailable/testresource2/testcommand1', 'True'), ('commandavailable/testresource2/testcommand2', 'True'), ('commandavailable/testresource2/testcommand3', 'True'), ] for topic, payload in mqtt_message_list: pub1 = subprocess.Popen(['mosquitto_pub', '-t', topic, '-m', payload]) time.sleep(0.2) pub1.terminate() time.sleep(3) # Simulate a stopping resource pub1 = subprocess.Popen(['mosquitto_pub', '-t', 'resourceavailable/testresource2/presence', '-m', 'False']) time.sleep(2) pub1.terminate() # Test error handling mqtt_message_list = [('resourceavailable/testresource3/presence/extralevel', 'True'), ('resourceavailable/testresource4/presence', 'False'), ('a/b', '1.2.3'), ('hatt/testresource5/presence', 'True'), ] for topic, payload in mqtt_message_list: pub1 = subprocess.Popen(['mosquitto_pub', '-t', topic, '-m', payload]) time.sleep(0.2) pub1.terminate() # Terminate servicemanager, and flush files servicemanager.send_signal(signal.SIGINT) time.sleep(1) servicemanager.terminate() time.sleep(1) servicemanager.kill() time.sleep(3) subcriber.kill() time.sleep(0.5) subscriber_outputfile.flush() os.fsync(subscriber_outputfile.fileno()) # Verify that the servicemanager has sent proper MQTT messages with open(self.OUTPUT_FILE_SUBSCRIBER, 'r') as subscriber_outputfile: text = ' '.join(subscriber_outputfile.readlines()) self.assertIn("dataavailable/testresource2/testdata1 False", text) self.assertIn("dataavailable/testresource2/testdata2 False", text) self.assertIn("dataavailable/testresource2/testdata3 False", text) self.assertIn("commandavailable/testresource2/testcommand1 False", text) self.assertIn("commandavailable/testresource2/testcommand2 False", text) self.assertIn("commandavailable/testresource2/testcommand3 False", text)
if __name__ == '__main__': # Run all tests # unittest.main(verbosity=2) # Run a single test # # suite = unittest.TestSuite() # suite.addTest(TestCanBus("testReceiveNoData")) # unittest.TextTestRunner(verbosity=2).run(suite)