User Tools

Site Tools


sib:listeners:rabbitlistener

RabbitListener

Overview

The RabbitListener SIB Listener consumes RabbitMQ queues. When a message is received the listener transforms the message to a SIB message and triggers the workflow.

Details

Modulesib_rabbitlistener
Version1.3.2

Workflow

  • Start consuming queues defined in .custom.q
    • on message:
      • make SIBMessage from RabbitMQ message
      • apply SIBHeader
      • publish: .custom.topic
      • if noAck is false
        • wait for acknowledgement
        • if ack not received within time limit
          • execute basic.nack on message
  • Subscribe: .custom.topic/REQUEUE
    • on message:
      • send message to archive queue
      • custom.inputFormat
      • Publish: msg._sibheader.topic
  • Subscribe: .custom.topic/RESPONSE/#
    • on message:
      • if ack message
        • perform basic.ack on message
      • if error message
        • send message to backout queue
        • send basic.ack if message is waiting for ack

Custom configuration

rabbitlistener/custom.js
var RabbitServiceCustom = {
    _id: "rabbitSample:1",
    _name: "SIB RabbitMQ Listener Sample",
    loggerCfg: {
        dest: {
            file: false,
            console: true,
            mqtt: true
        }
    },
    inputFormat: (topic, msg, cb)=>{
        cb(msg);
    },
    rabbitmq: "amqp://localhost",
    ack_timeout: 60000,
    prefetch: 0,
    q: [{path: "path/to/queue", bo: "path/to/queue/bo", archive: "path/to/queue/archive", durable: true, noAck: false}],
    mqtt: "mqtt://localhost",
    topic: "SIB/Samples/RabbitListener"
};
 
module.exports = RabbitServiceCustom;
  • _id: The unique ID of this service
  • _name: A plaintext description of this service
  • inputFormat(topic, msg, cb): The method that is run on service activation. The original message is passed as the argument and the return value from this method must be a JSON formatted message conforming to the SIB message standard.
  • rabbitmq: The RabbitMQ connection string
  • ack_timeout: How long the listener should wait for acks before sending nack and forgetting messages
  • prefetch: How many unacknowledged messages to accept. If set to 1, the listener will only receive one message at a time and the second message will not be received until the first one is ack:ed. 0 means no limit. Default is 0.
  • q: Definitions of the queues that should be consumed by the listener. Each queue definition should contain:
    • path: The name of the queue
    • bo: The name of the backout queue. Default is path + “/bo”
    • archive: The name of the archive queue. Default is path + “/archive”
    • durable: Set to true if the queue should be durable. If false the queue will be removed when the RabbitMQ is restarted. Default is true.
    • noAck: If set to true, messages will be removed from the MQ as soon as they are received by the listener. If false, messages will not be removed until they are acknowledged. Default is false.
  • loggerCfg: A JSON Object defining what logger actions should be active for this service (see Logger Configuration
  • mqtt: The MQTT connection string for the internal MQTT transport
  • topic: The MQTT topic to publish to on the internal MQTT transport

Ack message format

To perform a basic.ack publish a message of the following format to the response MQTT topic:

{"ack": msg._sibheader.msgId}

Backout message

To transfer a message to the backout queue, return the entire outgoing message with an added value of “error: true”. The message will be put in the backout queue and the original message will be acked if applicable.

sib/listeners/rabbitlistener.txt · Last modified: 2019/05/13 11:46 by hubbe