User Tools

Site Tools




The MQTTListener SIB Listener listens to a topic on an MQTT service that might differ from the internal MQTT transport. When a message arrives the workflow is triggered.




  • Subscribe: .custom.src.topic
    • on message:
      • Push relevant _sibheader information to _sibheader._store for later use
      • apply/copy SIBHeader
      • custom.inputFormat
      • Publish: .custom.topic
  • Subscribe: .custom.topic/REQUEUE
    • on message:
      • Publish: msg._sibheader.topic
  • Subscribe: .custom.topic/RESPONSE/#
    • on message:
      • custom.outputFormat
      • if stored _sibheader._store.[SID].responseTopic
        • reset msg._sibheader.topic and msg._sibheader.responseTopic from msg._sibheader._store
        • if custom.publishOutput
          • custom.publishOutput
        • else
          • Publish: msg._sibheader.responseTopic

Custom configuration

var MQTTServiceCustom = {
	_id: "mqttSample:1",
	_name: "SIB MQTT Listener Sample",
	buildSIBMsg: (topic, msg )=>{
		return msg;
	init: ( thisListener )=>{},
	inputFormat: (topic, msg, callback)=>{
		msg.fmtData = "Data has been formatted and modified";
		callback( msg );
	loggerCfg: {
		dest: {
			file: false,
			console: true,
			mqtt: true
	outputFormat: (topic, msg, callback)=>{
		msg.outData = "output sent";
	mqtt: "mqtt://localhost",
        publishOutput: (msg, mqttClient)=>{
                mqttClient.publish( msg.responseTopic, JSON.stringify( msg ) );
        publishInput: (msg, mqttClient)=>{
        	mqttClient.publish( msg._sibheader.topic, JSON.stringify( msg ) );
        responseFilter: (topic, msg )=>{
                return true;
	src: {
		mqtt: "mqtt://localhost",
		topic: "SIB/G2I.001/CRON/+",
		useenv: false
	topic: "SIB/Samples/MQTTListener"
module.exports = MQTTServiceCustom;
  • _id: The unique ID of this service
  • _name: A plaintext description of this service
  • buildSIBMsg(topic, msg): A method that builds a valid SIB Json message from the source MQTT input. This is required if the source MQTT does not send JSON encoded messages.
  • init(thisListener): If defined this method is executed when the service is starting up as the last step of initialization. thisListener is a reference to the instance of the listener that does the method call.
  • inputFormat(topic, msg, callback): The method that is run on service activation. The original message is passed as the argument and the return value from this method must be a JSON formatted message conforming to the SIB message standard.
  • loggerCfg: A JSON Object defining what logger actions should be active for this service (see Logger Configuration
  • outputFormat(topic, msg, callback): This message is run on the response data before publishing to the response topic. The response message is passed as the argument and the return data must be a JSON formatted message conforming to the SIB message standard.
  • publishOutput: If defined overrides the standard response publishing
  • publishInput: If defined overrides the standard publishing to the internal MQTT transfer
  • src: Object defining the source MQTT and topic to subscribe to
    • mqtt: The MQTT server to connect to for incoming messages, this might differ from the internal MQTT transport
    • topic: The topic to subscribe to for incoming messages, as of 1.2.7 this can also be an array of topics to subscribe to multiple topics
    • useenv: Boolean, default false. If true the src.topic will be prefixed with the process.env.NODE_ENV environment as all internal MQTT topics are
  • mqtt: The MQTT connection string for the internal MQTT transport
  • responseFilter(topic, msg): Method that if it exists and resturns false this response message is ignored
  • topic: The MQTT topic to publish to on the internal MQTT transport
sib/listeners/mqttlistener.txt · Last modified: 2019/09/20 07:32 by hubbe