#!/usr/bin/env python3
# A servicemanager for the Secure Gateway concept architecture.
#
# Author: Jonas Berg
# Copyright (c) 2016, Semcon Sweden AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are permitted
# provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and
# the following disclaimer in the documentation and/or other materials provided with the distribution.
# 3. Neither the name of the Semcon Sweden AB nor the names of its contributors may be used to endorse or
# promote products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import collections
import logging
import signal
import sys
import types
assert sys.version_info >= (3, 3, 0), "Python version 3.3 or later required!"
import sgframework
import sgframework.constants as constants
[docs]def signal_handler(signum, frame):
# This seems necessary to get code coverage measurements from subprocesses to work
print('Handled Linux signal number:', signum)
signal.signal(signal.SIGTERM, signal_handler)
DESCRIPTIVE_TEXT_TEMPLATE = """
A Service Manager for the Secure Gateway concept architecture
It requires a MQTT broker (for example Mosquitto), and a MQTT client library (Paho).
It registers on the Secure Gateway network, and can connect to
the broker in a secure or insecure way.
The settings of the broker determines what is allowed. To connect in the secure way,
the directory of the certificate files must be specified.
The certificate files should be named:
CA file: {}
Certificate file: {}
Key file: {}
"""
MQTT_KEEPALIVE_TIME = 10 # Seconds
CLIENT_NAME = 'servicemanager'
[docs]def main():
epilog = DESCRIPTIVE_TEXT_TEMPLATE.format(sgframework.Resource.CA_CERTS,
sgframework.Resource.CERTFILE, sgframework.Resource.KEYFILE)
commandlineparser = argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
commandlineparser.add_argument('-v',
action='count', default=0,
help="Increase verbosity level. Can be repeated.")
commandlineparser.add_argument('-version',
action='version', version="Version of sgframework is {}.".
format(sgframework.__version__))
commandlineparser.add_argument('-host',
default='localhost',
help="Broker host name. Defaults to '%(default)s'.")
commandlineparser.add_argument('-port',
default=1883,
help="Broker port number. Defaults to %(default)s.")
commandlineparser.add_argument('-cert',
help="Directory for certificate files. Defaults to not using certificates.")
commandlineparser.add_argument('-qos',
type=int,
choices=[0, 1, 2],
default=0,
help="MQTT quality-of-service setting. Defaults to '%(default)s'.")
commandline = commandlineparser.parse_args()
if commandline.v == 1:
loglevel = logging.INFO
elif commandline.v >= 2:
loglevel = logging.DEBUG
else:
loglevel = logging.WARNING
logging.basicConfig(level=loglevel)
storage = {}
storage['available_resources'] = set()
storage['available_commands'] = collections.defaultdict(set)
storage['available_data'] = collections.defaultdict(set)
logging.info(" ")
logging.info(" ")
logging.info(" ***** Starting servicemanager *****")
manager = sgframework.framework.BaseFramework(CLIENT_NAME, commandline.host, commandline.port, commandline.cert)
manager._on_incoming_message = types.MethodType(on_incoming_message, manager)
manager._subscribe_to_inputsignals = types.MethodType(subscribe_to_inputsignals, manager)
manager.keepalive = MQTT_KEEPALIVE_TIME
manager.qos = commandline.qos
manager.userdata = storage
manager.start()
while True:
manager.loop()
[docs]def on_incoming_message(self, mqttclient, userdata, message):
"""MQTT callback at incoming messages.
Method signature according to Paho documentation.
"""
storage = userdata
## Extract information from the message ##
inputpayload = str(message.payload, encoding='latin1').strip() # Paho MQTT delivers bytes for Python3
inputtopic = str(message.topic).strip()
topic_hierarchy = inputtopic.split(constants.MQTT_TOPIC_SEPARATOR)
if len(topic_hierarchy) != constants.MQTT_TOPIC_DEPTH:
self.logger.warning("Received wrong MQTT topic structure: {}, payload: '{}'".format(inputtopic, inputpayload))
return
self.logger.debug("Received message. Topic: {}, payload: '{}'".format(inputtopic, inputpayload))
messagetype, servicename, signalname = topic_hierarchy
messagetype = messagetype.strip()
servicename = servicename.strip()
signalname = signalname.strip()
# Available resource
if messagetype == constants.PREFIX_RESOURCEAVAILABLE \
and signalname == constants.SUFFIX_PRESENCE \
and inputpayload == constants.PAYLOAD_TRUE:
storage['available_resources'].update([servicename])
self.logger.info("Available resource: {}".format(servicename))
# Available data and commands
elif messagetype == constants.PREFIX_COMMANDAVAILABLE and inputpayload == constants.PAYLOAD_TRUE:
self.logger.info("Available command: '{}' for resource '{}'".format(signalname, servicename))
storage['available_resources'].update([servicename])
storage['available_commands'][servicename].update([signalname])
elif messagetype == constants.PREFIX_DATAAVAILABLE and inputpayload == constants.PAYLOAD_TRUE:
self.logger.info("Available data: '{}' for resource '{}'".format(signalname, servicename))
storage['available_resources'].update([servicename])
storage['available_data'][servicename].update([signalname])
# Resource now offline
elif messagetype == constants.PREFIX_RESOURCEAVAILABLE \
and signalname == constants.SUFFIX_PRESENCE \
and inputpayload == constants.PAYLOAD_FALSE:
if servicename in storage['available_resources']:
self.logger.info("Resource now offline: '{}'".format(servicename))
if servicename in storage['available_commands']:
for commandname in storage['available_commands'][servicename]:
topic = constants.MQTT_TOPIC_TEMPLATE.format(constants.PREFIX_COMMANDAVAILABLE,
servicename,
commandname)
self.logger.info("Sending out command unavailable: '{}' for resource '{}'".format(
commandname, servicename))
mqttclient.publish(topic,
constants.PAYLOAD_FALSE,
qos=self.qos,
retain=True)
if servicename in storage['available_data']:
for dataname in storage['available_data'][servicename]:
topic = constants.MQTT_TOPIC_TEMPLATE.format(constants.PREFIX_DATAAVAILABLE,
servicename,
dataname)
self.logger.info("Sending out data unavailable: '{}' for resource '{}'".format(
dataname, servicename))
mqttclient.publish(topic,
constants.PAYLOAD_FALSE,
qos=self.qos,
retain=True)
storage['available_resources'] -= set([servicename])
storage['available_data'].pop(servicename, None)
storage['available_commands'].pop(servicename, None)
else:
self.logger.warning("Message about offline resource, but it was not listed before. " +
"Topic: '{}' Payload: '{}'".format(inputtopic, inputpayload))
# Messages not to handle
elif messagetype in [constants.PREFIX_DATAAVAILABLE, constants.PREFIX_COMMANDAVAILABLE] \
and inputpayload == constants.PAYLOAD_FALSE:
self.logger.debug("Unavailability message for data or command. Probably old from broker. " +
"Topic: '{}' Payload: '{}'".format(inputtopic, inputpayload))
else:
self.logger.warning("Wrong message structure. Topic: '{}' Payload: '{}'".format(inputtopic, inputpayload))
if __name__ == '__main__':
main()